Still out on holiday for a bit, but hopping back over here to chime in - This API is quite useful. I have seen variants of this being used pretty extensively and for good reason too. When originally designing AsyncStream this was one of the considerations early on but was dropped because of the slight oddity of the construction. Retrospectively looking at it, this perhaps was the more general way of approaching the problem; the only advantage that the closure gives is isolation of where the Continuation is scoped to.
All in all I think this is a good fit, a good tool, and helps developers avoid confusion about the escapes of the continuation and usages of force unwrapping. Which for the cases it is useful, quite honestly it is the right tool for the job.
However... there is one danger zone that is easily tempting with the API to go down; that of the structure versus tuple. Personally I am of the belief that 9 times out of 10 (or more) tuples are the wrong choice for the job and structures are better picks because of extensibility and general ergonomics. This API is one of the exceptions to that belief. The reason being is that it is very tempting to then store that type somewhere for vending out to multiple consumers. This immediately becomes a bit of a sticky situation per the iteration on multiple tasks. Being a tuple it isolates the concerns of each item and infers the resultant is for independent consumption; which in my view aides in the use of the API that results in expected behaviors.
Keeping it as a tuple return reinforces the construction nature of things and more importantly isolates the concerns of sending and the sequence part.
I also feel that back deploying this is reasonable and warranted - it prevents folks from writing wonky code that might be error prone and offers an ergonomic that as I said before perhaps should have been there from the start.
Seeing as we're now escaping the closure anyway, is there a reason we couldn't adapt AsyncStream to accept a Continuation – or more likely a type that wraps the continuation – as a parameter for a new initialiser on AsyncStream:
With AsyncStreamSource having essentially the same API as the AsyncStream's Continuation type.
The one issue I see is that you could initialise multiple AsyncStreams with the same storage, but that would simply be programmer error, not dissimilar to attempting to iterate an AsyncStream from multiple tasks.
API wise it feels a bit cleaner then the tuple setup, and while it might take a bit of juggling under the hood, it shouldn't impact any existing APIs.
As we explore some alternate solutions regarding the ergonomics of the API, I’d like to mention the case of AsyncChannel in Swift async algorithms. This type acts has a communication bus between tasks, by allowing some to send and some others to iterate over it. AsyncChannel conforms to AsyncSequence and offers a suspending function to send elements (and a non suspending one to finish).
I feel that what we are trying to achieve by giving a « factory » function to AsyncStream that returns both a « sender » type and an AsyncSequence is very close to the notion of Channel.
In the end I fear that the coexistence in the same code base of AsyncChannel and AsyncStream, each one offering a different instantiation and usage mechanism will be confusing.
let channel = AsyncChannel<Int>()
await channel.send(1)
…
for await element in channel { … }
let (continuation, stream) = AsyncStream<Int>.makeStream()
continuation.yield(1)
…
for await element in stream { … }
IMO both are made to make tasks communicate but offer a different back pressure management.
Shouldn’t we try to harmonise that to improve the discoverability and readability of the API ?
One thing that I think is important in this conversation is the ability for developers to express which types or scopes have the responsibility to send events on a stream vs merely receiving them. It makes code much easier to reason about.
One advantage of separating the continuation from the stream is that it allows developers to bestow this responsibility. Passing a channel around, we’d have to type erase / lean on existentials to make the sending/receiving responsibility clear, right?
Maybe @twittemb’s onto something here. Maybe async channels need to live in the standard library and async streams be created from channels. Then need for an escaping continuation starts to disappear.
Yes I think we should pass the AsyncSequence protocol when only consumption is expected, this is the read-only version of the AsyncChannel (the primary associated type should help, when implemented)
Thanks for all the feedback and healthy discussions!
In my opinion, this is too dangerous and too easy to misuse. As you already pointed out, you could then pass the Continuation/Source to many AsyncStream.inits and that would not be okay. I think we shouldn't try to change the world here, but rather just make the AsyncStream creation more convenient.
Just going to pick this one quote here but replying to all that you said. I disagree with the statement. AsyncStream and AsyncChannel are catering to two very separate use-cases. AsyncStream allows sync-to-async bridges to be build like bridging delegate based APIs (while giving some means of back pressure). Of course it also supports an async-to-async bridge. AsyncChannel on the other hand is only allowing async-to-async communication (by the fact that func send() is async.
On the general notation of separating the producing interface of a root AsyncSequence e.g. AsyncStream.Continuation or NIOAsyncSequenceProducer.Source from the consuming i.e. AsyncSequence is in my opinion good and makes it easier to implement things like early termination. However, I really believe this needs to be evaluated on a type by type basis and is not something that this proposal aims to unify. It is something that should be discussed in the AsyncChannel review.
I don't think AsyncChannel should live in the standard library. Like with many other things that have been discussed on this forums the stdlib should not become a place to just put everything. Having important things live in separate packages is IMO good.
I agree with the general notion that only the required interface should be passed when only that one is needed. However, I don't think passing an AsyncSequence protocol is the right approach. First, it is currently not possible since AsyncSequence hasn't adopted primary associated types right now. If it does we should not arbitrarily pass any AsyncSequence<Foo> everywhere since that has performance implications due to the existential. In a lot of cases, having a more concrete AsyncSequence implementation that is exposed is better for both performance and evolvability of the API. What I have recommend in a few places already is to create a concrete type like FooAsyncSequence and back it with something like AsyncStream. This allows you to switch the underlying type without breaking API.
Thank you for the proposal. I've been reaching for AsyncStreams frequently the last few months and I think the original API shape led to a lot of confusion at first because it seemed wrong to escape the continuation, but that is actually okay in this case. While I do agree that this proposal makes the instructions for using this API much clearer, I do worry about the safety.
By making continuation part of the return type, it signals that it is meant to be used, but in doing so, I think it also clouds the understanding that the continuation is meant to be the (singular) other end of the AsyncStream "pipe". So I worry that this piece of the proposal would inadvertently suggest to the reader that they should be able to pass that continuation around to whoever wants it...turning a single entry/exit point into a multi-entry/single exit point.
One thing that I've done in my usage of AsyncStream is to effectively create a stateful factory of AsyncStreams that shields both the producer and consumer of the stream of the idea of a continuation. I don't recall who gave me this inspiration but this wasn't my idea.
public actor AsyncStreamManager<T: Sendable> {
private var continuations: [UUID: AsyncStream<T>.Continuation]
public init() {
self.continuations = [:]
}
private func insert(continuation: AsyncStream<T>.Continuation) {
let uuid = UUID()
continuations[uuid] = continuation
}
public func send(_ value: T) {
continuations.values.forEach { continuation in
continuation.yield(value)
}
}
public func finish(_ value: T) {
send(value)
continuations.values.forEach { continuation in
continuation.finish()
}
continuations.removeAll()
}
public func makeStream() -> AsyncStream<T> {
let stream = AsyncStream { continuation in
insert(continuation: continuation)
}
return stream
}
}
I wonder if a similar idea should be employed as part of the AsyncStream API? The continuation is ancillary but a bit distracting to the purpose of AsyncStream which is to consume values over time. If we can help developers by not needing to worry about the medium by which they can publish their values, I think it would go a long way to making AsyncStream safer.
All that said, I'm by no means a language expert, just a guy with a couple thoughts.
The continuation of an AsyncStream is thread-safe (hence Sendable) which is partly why the original API seemed so restrictive. So effectively an AsyncStreamis multi producer – but unfortunately not multi-consumer.