[Concurrency] YieldingContinuation

That it pretty much exactly what I was looking for. Big + 1 from me.

I was hoping to try this out but I don't see the Series implementation anywhere. (My Alamofire testing is also currently blocked by SR-14506.)

I'm not quite sure what you've modeled here matches the expected use of responseStream. That's fine, as a more expected use is probably simpler. First, there's no need for error handling here, all errors are encapsulated by the Stream enum itself. In other adaptations of Alamofire's APIs to async it seemed best to simply expose Alamofire's error encapsulations directly, rather than immediately going for a throwing adaptation. In this case, that means providing a series of Stream values directly (ignoring Stream's generic nature for now, and the fact that these will probably need to be functions).

extension DataStreamRequest {
  var stream: Series<Stream> {
    Series(buffering: Stream.self) { continuation in
      continuation.onCancel { self.cancel() }
      responseStream { stream in
        continuation.resume(returning: stream)

        if case .complete = stream {
          continuation.finish()
        }
      }
    }
  }
}

I would hope that once we have a Series of those events we can somehow adapt them into the form you have (ThrowingSeries<T>) for whatever values the Stream actually encapsulates.

Additionally, Alamofire already encapsulates cancellation errors, as it's a necessary part of wrapping URLSession. In the Stream it's emitted as part of the .completion Event, so if that value can't be caught, we probably just want to end immediately without that value, unfortunately. This is similar to how the 2020 changes to Combine affected Alamofire's publishers: in the event of cancellation, the actual value emitted by cancellation is never received. I'm not sure whether there's a better solution here.

I would hope that once we have a Series of those events we can somehow adapt them into the form you have ( ThrowingSeries<T> ) for whatever values the Stream actually encapsulates.

Can you perhaps elaborate on that? I am not sure what you mean there, perhaps are you speaking to un-wrapping the errors from the series by mapping them? e.g. someDataStreamRequest.stream.map { try $0.someThrowingFunctionOnStream } that will end up making an async sequence that throws potentially.

One other note by the Series design: you can emit a failure inside that onCancel which is pretty neat; and that failure/error will be guaranteed to be sent before the termination of the stream (provided the cancel happens before any other termination). That deserves an example:

let s: ThrowingSeries<Int> = ThrowingSeries { continuation in 
  continuation.onCancel = {
      continuation.resume(returning: 99)
      continuation.resume(throwing: SomeError()
  }
  detach {
     continuation.resume(yielding: 0)
     sleep(2)
     continuation.resume(yielding: 1)
     sleep(2)
     continuation.resume(yielding: 2)
  }
}
let task = detach {
  do {
    for try await item in s {
       print(item)
    }
  } catch {
    print("error"
  }  
}
sleep(3)
task.cancel()

That creates a throwing series which when cancelled emits 99 and then throws an error, and for it's values is running in concurrently with the rest of the program emitting every 2 seconds 0, 1 and then 2.
After 3 seconds it is cancelled so the output in this case ends up being 0, 1, 99, and error. Which if I understand correctly allows you to model exactly what you want with Alamofire (as well as generally lets it be quite adaptable for numerous other APIs out there).

2 Likes

Yes, my hope would be to vend a base Series<Stream> and then the user can transform it themselves and pull out the values they're interested in while staying an async API. Additionally, Alamofire could also vend direct payload streams (ThrowingSeries<T>), so it would be easier if we could use a base stream.

That's an interesting solution, but like the Combine scenario, the cancellation values are produced asynchronously, especially the URLSessionTaskMetrics. I could return only the known values, but I'm not sure that'd work for every scenario.

I have use YieldingContinuation in RSocket to add support for async/await for request streams.
You can find the relevant prototype implementation here: rsocket-swift/AsyncAwait.swift at f8295e8a3a15df0f38896c2d45aac3c2be5a568d · rsocket/rsocket-swift · GitHub
As I need buffering, I have also used the example of the Buffered actor from @Philippe_Hausler , just slightly modified to support errors.

I initially thought that YieldingContinuation would itself conform to AsyncSequence.
My confusion for that comes probably from the next() method. Both, YieldingContinuation and AsyncSequence, have a method called next(). The method signature is only slightly different. AsyncSequence returns an Optional and YieldingContinuation does not (but it could if the Element type is optional). Maybe we can rename the next() method to something else (e.g. get()) as it has completely different semantics as the next() method in AsyncSequence.

That is a perfectly reasonable approach with the way things are set-up right now. If you never throw that can let folks pull out what they need and throw accordingly in maps or filters etc. Which will make the resultant AsyncSequence require trying while iterating or try on any reducer. So I think that fits the bill in this case (and numerous others).

So there is a slight bug that can happen (but is VERY rare.... the worst kind of bug) with that approach I have found (hence the slight change in the idea).

RSocket has the following:

public func onNext(_ payload: Payload, isCompletion: Bool) {
        detach { [self] in

That detach can mean that the values are sent out of order since detaching is a concurrent execution. It means that really the atomicity of the continuation backing the yielding and the buffer share the same critical region.

I think Series might make this a bit easier to write; I will take a few moments this afternoon to write up a sample of what it would look like.

1 Like

Actually looking at it further; you are adapting a RxSwift style demand based system here; from what I can tell each call to next translates to a demand of 1 which that yields back to the continuation buffering into the actor. That alleviates your potentials of a race - so perhaps that might be one of the few cases where YieldingContinuation would be less complicated (and perhaps that can be optimized even further than what you have so far - which looks to be a good start).

The name of next was chosen specifically to avoid conflation with the idea of Task.Handle.get() (early prototypes of YieldingContinuation were named get() for that parity but it was considered confusing that the value would be different when called again so the parlance of next was chosen. In my view they both are equally valid and come with nearly identical levels of name laden baggage.

Thanks, you are absolutely right! It currently uses a demand of 1 but this will be configurable/dynamic and very likely greater than 1. Otherwise it would introduce a latency of at least one round-trip from the client to the server and back, in addition to any processing time.

We definitely need in-order delivery of all messages in RSocket. ThrowingSeries sounds like the right tool for this. Do you already have an implementation for ThrowingSeries ?

Here is a very quickly written prototype of something that would use ThrowingSeries

    public func requestStream(payload: Payload) -> ThrowingSeries<Payload> {
        return ThrowingSeries { continuation in
            final class Context: UnidirectionalStream {
                var continuation: ThrowingSeries<Payload>.Continuation
                var subscription: Subscription!

                init(_ continuation: ThrowingSeries<Payload>.Continuation, payload: Payload, requester: RSocketCore.RSocket) {
                    self.continuation = continuation
                    subscription = requester.stream(payload: payload, initialRequestN: 0, responderStream: self)
                }

                deinit {
                    subscription.onCancel()
                }

                func onNext(_ payload: Payload, isCompletion: Bool) {
                    continuation.resume(yielding: payload)
                }
                
                func onComplete() {
                    continuation.finish()
                }
                
                func onRequestN(_ requestN: Int32) {
                    assertionFailure("request response does not support \(#function)")
                }
                
                func onCancel() {
                    continuation.finish()
                }

                func cancel() {
                    subscription.onCancel()
                }
                
                func onError(_ error: Error) {
                    continuation.finish(throwing: error)
                }
                
                func onExtension(extendedType: Int32, payload: Payload, canBeIgnored: Bool) {
                    assertionFailure("request response does not support \(#function)")
                }
            }
            let stream = Context(continuation, payload: payload, requester: requester)
            continuation.onCancel = {
                stream.cancel()
            }
        }
    }

Which from what I can tell achieves the same thing w/ a bit less code. It seems like it can be done reasonably w/ 45 new lines and 105 removed lines, not that brevity is a goal but it seems simpler to me.

2 Likes

Thanks again. This looks much simpler and is easier to understand. I like it a lot.

Is onCancel also called once ThrowingSeries is deallocated or only through explicit cancellation with the Task API?

The current prototype does not do that, however that is probably a logic hole in the implementation. Since onCancel is intended for the "cleanup phase" it is reasonable to consider that to be treated as cancelled if the inner storage deinit.

@nnnnnnnn noted that anything that is a resource allocation in the init closure needs to have some way of cleaning up - I would expect that it is reasonable to expect folks to cleanup before invoking finish() but in the other cases where the Series/ThrowingSeries has no more references that it also cleans up via onCancel.

2 Likes
Terms of Service

Privacy Policy

Cookie Policy