I know it's a few days last the review deadline but it slipped past me with so much concurrency stuff happening... concurrently (?). I was asked to give this a review and drop some notes though so better late than after conclusions are posted...
This feedback was already shared with proposal authors and the core team - there's nothing new here. But for the sake of public record I thought it is fair to include the notes in the official review thread.
Overall type usefulness
Yes, the type, and general pattern, is of great value and I have no doubt about its utility.
I'm very supportive of making this happen in general!
I have prior experience on the strong need for such types and can also see where we need it in Swift today. Unless we provide an implementation, there will be plenty of semi-broken ad-hoc implementations of the pattern floating around; It really makes sense to have one blessed impl of this.
Technically it could ship in a standalone package if the core team decided it needs to mature some more before pulling into stdlib. It does not really need language features to be implemented AFAICS.
default buffer behavior
I don't think it's a good idea to make this decision for the user, and definitely is picking .max
as default buffer size not a good idea for most use cases.
Instead not default buffer size should be provided and users must decide what size to use.
The buffer size must be > 0.
yield / push / offer | back-pressure
This type is mostly for bridging non stream aware, non async, non back-pressurable sources to the world of async sequences. As such, what we can do in terms of back-pressure is limited, but crucial.
The offer / push operation, currently called yield
currently returns nothing, this is bad as it makes it impossible for the sender to e.g. back off and try again. It is a naive form of back-pressure, but better than nothing.
The operation should instead return an enum signaling what the state of the stream is: e.g. enqueued
, dropped
, terminated
. Some nice prior art to follow here exists in Akka Stream's Source.queue, details here QueueOfferResult:
case OfferResult {
case enqueued
case dropped
// maybe: case Failure(cause: Throwable) extends QueueCompletionResult // optional
case terminated // or queueClosed or similar ("finished" I guess)
}
On the naming of yield: personally, I really don't like calling this yield... A yield operation is not something that one usually associates with something that can fail. This is not a generator, even though it'd like to pretend it is 
"Cancellation handlers"
I understand what this is trying to offer, but I remain confused about how this would actually be used.
It also feels weird that the termination can be finished or cancelled. This goes against the grain of what reactive streams have established, and we in some ways also follow in Swift's Structured Concurrency: cancellation only flows "down", and completion flows "up" - it feels very weird to see cancelled in an "up" position.
I'm a little bit concerned that the termination handler is a property rather than just a .onTermination { ... }
it seems this makes it hard to register multiple parties for listening for termination here?
Naming: sometimes the types use "finish
" and sometimes "termination
", it seems inconsistent. I would suggest sticking to "Completion" as "either error or successful". Then finish becomes complete()
and fail(Error)
and either of those is complete
, as such then the type can be called Completed
.
I also can't help but wonder how it would actually be used... the example only does some println
which does not really show real world usage much. Won't we be forced into locking around things here? Or is the intent that the source would be contained in an actor, and from the cancellation handler we'd kick off async { self.actuallyCancelTheWork() }
or something like that? Could you clarify this piece of the design?
Type name
I'm sure you've had enough of people complaining about the name, but I really feel very strongly about this. And given my past experience* building stream libraries I feel this is productive input.
I, personally, would very much object to calling this "AsyncStream
" - this isn't "the stream" and this send a very wrong message to users of this type. It's not a type intended to be passed around a lot, and it's not the return type that should be used in APIs, an "some AsyncSequence<T>
" (which we can't do today...) is what we'd really want OR some type-eraser.
It should likely not be surfaced in return types or in general APIs which intend to be ABI stable.
And calling it "stream" makes it sound like it should be. It just happens to be one of many popular ways to kick off a stream. Other approaches are "unfold" (see here: Source.unfold • Akka Documentation) which could be implemented in the future. Some name using Source, Origin, Builder (meh) or Generator (not quite) may be considered.
It could also be considered to have the type NOT conform to AsyncSequence but have a .sequence
or some run()
or similar that returns an async sequence -- this would help avoiding the mistake of surfacing this type by accident perhaps?
*Disclaimer: I worked on the original reactive streams specification, as well as akka streams which has this exact capability that this is modeling. There we where it is neatly named by what it offers, being a source to a stream: Source.queue.