That it pretty much exactly what I was looking for. Big + 1 from me.
I was hoping to try this out but I don't see the Series
implementation anywhere. (My Alamofire testing is also currently blocked by SR-14506.)
I'm not quite sure what you've modeled here matches the expected use of responseStream
. That's fine, as a more expected use is probably simpler. First, there's no need for error handling here, all errors are encapsulated by the Stream
enum itself. In other adaptations of Alamofire's APIs to async
it seemed best to simply expose Alamofire's error encapsulations directly, rather than immediately going for a throwing adaptation. In this case, that means providing a series of Stream
values directly (ignoring Stream
's generic nature for now, and the fact that these will probably need to be functions).
extension DataStreamRequest {
var stream: Series<Stream> {
Series(buffering: Stream.self) { continuation in
continuation.onCancel { self.cancel() }
responseStream { stream in
continuation.resume(returning: stream)
if case .complete = stream {
continuation.finish()
}
}
}
}
}
I would hope that once we have a Series
of those events we can somehow adapt them into the form you have (ThrowingSeries<T>
) for whatever values the Stream
actually encapsulates.
Additionally, Alamofire already encapsulates cancellation errors, as it's a necessary part of wrapping URLSession
. In the Stream
it's emitted as part of the .completion
Event
, so if that value can't be caught, we probably just want to end immediately without that value, unfortunately. This is similar to how the 2020 changes to Combine affected Alamofire's publishers: in the event of cancellation, the actual value emitted by cancellation is never received. I'm not sure whether there's a better solution here.
I would hope that once we have a
Series
of those events we can somehow adapt them into the form you have (ThrowingSeries<T>
) for whatever values theStream
actually encapsulates.
Can you perhaps elaborate on that? I am not sure what you mean there, perhaps are you speaking to un-wrapping the errors from the series by mapping them? e.g. someDataStreamRequest.stream.map { try $0.someThrowingFunctionOnStream }
that will end up making an async sequence that throws potentially.
One other note by the Series design: you can emit a failure inside that onCancel
which is pretty neat; and that failure/error will be guaranteed to be sent before the termination of the stream (provided the cancel happens before any other termination). That deserves an example:
let s: ThrowingSeries<Int> = ThrowingSeries { continuation in
continuation.onCancel = {
continuation.resume(returning: 99)
continuation.resume(throwing: SomeError()
}
detach {
continuation.resume(yielding: 0)
sleep(2)
continuation.resume(yielding: 1)
sleep(2)
continuation.resume(yielding: 2)
}
}
let task = detach {
do {
for try await item in s {
print(item)
}
} catch {
print("error"
}
}
sleep(3)
task.cancel()
That creates a throwing series which when cancelled emits 99 and then throws an error, and for it's values is running in concurrently with the rest of the program emitting every 2 seconds 0, 1 and then 2.
After 3 seconds it is cancelled so the output in this case ends up being 0, 1, 99, and error. Which if I understand correctly allows you to model exactly what you want with Alamofire (as well as generally lets it be quite adaptable for numerous other APIs out there).
Yes, my hope would be to vend a base Series<Stream>
and then the user can transform it themselves and pull out the values they're interested in while staying an async
API. Additionally, Alamofire could also vend direct payload streams (ThrowingSeries<T>
), so it would be easier if we could use a base stream.
That's an interesting solution, but like the Combine scenario, the cancellation values are produced asynchronously, especially the URLSessionTaskMetrics
. I could return only the known values, but I'm not sure that'd work for every scenario.
I have use YieldingContinuation
in RSocket to add support for async/await for request streams.
You can find the relevant prototype implementation here: rsocket-swift/AsyncAwait.swift at f8295e8a3a15df0f38896c2d45aac3c2be5a568d · rsocket/rsocket-swift · GitHub
As I need buffering, I have also used the example of the Buffered
actor from @Philippe_Hausler , just slightly modified to support errors.
I initially thought that YieldingContinuation
would itself conform to AsyncSequence
.
My confusion for that comes probably from the next()
method. Both, YieldingContinuation
and AsyncSequence
, have a method called next()
. The method signature is only slightly different. AsyncSequence
returns an Optional
and YieldingContinuation
does not (but it could if the Element type is optional). Maybe we can rename the next()
method to something else (e.g. get()
) as it has completely different semantics as the next()
method in AsyncSequence
.
That is a perfectly reasonable approach with the way things are set-up right now. If you never throw that can let folks pull out what they need and throw accordingly in maps or filters etc. Which will make the resultant AsyncSequence require trying while iterating or try on any reducer. So I think that fits the bill in this case (and numerous others).
So there is a slight bug that can happen (but is VERY rare.... the worst kind of bug) with that approach I have found (hence the slight change in the idea).
RSocket has the following:
public func onNext(_ payload: Payload, isCompletion: Bool) {
detach { [self] in
That detach can mean that the values are sent out of order since detaching is a concurrent execution. It means that really the atomicity of the continuation backing the yielding and the buffer share the same critical region.
I think Series might make this a bit easier to write; I will take a few moments this afternoon to write up a sample of what it would look like.
Actually looking at it further; you are adapting a RxSwift style demand based system here; from what I can tell each call to next translates to a demand of 1 which that yields back to the continuation buffering into the actor. That alleviates your potentials of a race - so perhaps that might be one of the few cases where YieldingContinuation
would be less complicated (and perhaps that can be optimized even further than what you have so far - which looks to be a good start).
The name of next
was chosen specifically to avoid conflation with the idea of Task.Handle.get()
(early prototypes of YieldingContinuation
were named get()
for that parity but it was considered confusing that the value would be different when called again so the parlance of next
was chosen. In my view they both are equally valid and come with nearly identical levels of name laden baggage.
Thanks, you are absolutely right! It currently uses a demand of 1 but this will be configurable/dynamic and very likely greater than 1. Otherwise it would introduce a latency of at least one round-trip from the client to the server and back, in addition to any processing time.
We definitely need in-order delivery of all messages in RSocket. ThrowingSeries
sounds like the right tool for this. Do you already have an implementation for ThrowingSeries
?
Here is a very quickly written prototype of something that would use ThrowingSeries
public func requestStream(payload: Payload) -> ThrowingSeries<Payload> {
return ThrowingSeries { continuation in
final class Context: UnidirectionalStream {
var continuation: ThrowingSeries<Payload>.Continuation
var subscription: Subscription!
init(_ continuation: ThrowingSeries<Payload>.Continuation, payload: Payload, requester: RSocketCore.RSocket) {
self.continuation = continuation
subscription = requester.stream(payload: payload, initialRequestN: 0, responderStream: self)
}
deinit {
subscription.onCancel()
}
func onNext(_ payload: Payload, isCompletion: Bool) {
continuation.resume(yielding: payload)
}
func onComplete() {
continuation.finish()
}
func onRequestN(_ requestN: Int32) {
assertionFailure("request response does not support \(#function)")
}
func onCancel() {
continuation.finish()
}
func cancel() {
subscription.onCancel()
}
func onError(_ error: Error) {
continuation.finish(throwing: error)
}
func onExtension(extendedType: Int32, payload: Payload, canBeIgnored: Bool) {
assertionFailure("request response does not support \(#function)")
}
}
let stream = Context(continuation, payload: payload, requester: requester)
continuation.onCancel = {
stream.cancel()
}
}
}
Which from what I can tell achieves the same thing w/ a bit less code. It seems like it can be done reasonably w/ 45 new lines and 105 removed lines, not that brevity is a goal but it seems simpler to me.
Thanks again. This looks much simpler and is easier to understand. I like it a lot.
Is onCancel
also called once ThrowingSeries
is deallocated or only through explicit cancellation with the Task API?
The current prototype does not do that, however that is probably a logic hole in the implementation. Since onCancel
is intended for the "cleanup phase" it is reasonable to consider that to be treated as cancelled if the inner storage deinit.
@nnnnnnnn noted that anything that is a resource allocation in the init closure needs to have some way of cleaning up - I would expect that it is reasonable to expect folks to cleanup before invoking finish()
but in the other cases where the Series/ThrowingSeries has no more references that it also cleans up via onCancel
.