SE-0314: AsyncStream and AsyncThrowingStream

Hello Swift community,

The review of SE-0314 "AsyncStream and AsyncThrowingStream" begins now and runs through May 25, 2021.

Reviews are an important part of the Swift evolution process. All review feedback should be either on this forum thread or, if you would like to keep your feedback private, directly to the review manager. When emailing the review manager directly, please keep the proposal link at the top of the message.

What goes into a review?

The goal of the review process is to improve the proposal under review through constructive criticism and, eventually, determine the direction of Swift. When writing your review, here are some questions you might want to answer in your review:

  • What is your evaluation of the proposal?
  • Is the problem being addressed significant enough to warrant a change to Swift?
  • Does this proposal fit well with the feel and direction of Swift?
  • If you have used other languages or libraries with a similar feature, how do you feel that this proposal compares to those?
  • How much effort did you put into your review? A glance, a quick reading, or an in-depth study?

More information about the Swift evolution process is available at

https://github.com/apple/swift-evolution/blob/master/process.md

Thank you,

Doug Gregor
Review Manager

17 Likes

I'm +0.8 on this proposal. I was hoping this would be could be adapted to handle situations where backpressure is needed however I don't see how that could be done with the current approach. In the pitch for YieldingContinuation the yield function returned false if there are no potential awaiting calls to next. This would allow the caller to either drop the current value or buffer it and stop producing values until next is called again. Is this supposed to replace the need for YieldingContinuation?

2 Likes

What is your evaluation of the proposal?

-1 specifically on two details:

  • The default unlimited buffering behaviour is unideal.
  • Lack of backpressure support.

Both points have been previously brought up in the pitch thread.

The quickest way to illustrate why this is an issue, is perhaps simply by trying to lift some arbitrarily large Collections or Sequences into the async domain. These are not an uncommon use case:

  • (mentioned in the pitch) paginated read from a large file or a remote resource
  • a Int.min ... Int.max range
  • a random number generator
  • a cycled collection
  • maybe just a very large buffer in memory.

Users do often compose these kind of infinite or resource-backed streams, with other streams provided by the system, or defined by their own application logic. The proposal caters for the latter, but does not produce an answer for the former.

The default behaviour of maxBufferedElements: Int = .max, means that yield() will accept them, and append them into the buffer. Since there is no backward indication or mechanism to indicate that the buffer is filling up, iterations are allowed to continue. Eventually, these collections or resources will exhausitvely consumed, until either we run out of memory to allocate for the buffer, or we stuck in an infinite loop long enough to be killed by the OS, whichever comes first.

Specifying a maxBufferedElements in 0 ..< .max does no help either, because now it introduces the drop-oldest-on-full behaviour. It might work depending on the use case, but it is not a general solution.

Another way to summarize these issues is that:

  1. There is no way for a producer to regulate itself based on the rate of consumption, aka. backpressure support.

  2. The unlimited buffering is a divergent from the current mental model of Sequence. If you do:

    let infiniteEnergyGenerator: AnySequence<T>
    
    for i in infiniteEnergyGenerator {
        break
    }
    
    print("Hello world")
    

    Will Swift exhaustively consume infiniteEnergyGenerator? No, it stops consuming it, as soon as the programmer instructs it to do so.

    So if the base language works like this, can we achieve the same with AsyncStream as is? Not either. We are getting stuck in an infinite loop in the initializer of AsyncStream draining the infinite resource.

In short, I am skeptical about accepting the proposal as is — with a unfortunate dangerous default and absence of an important mechanism — into the Swift Standard Library.

Comparing to reactive streams libraries we currently have, AsyncStream is born into a world with suspendable async functions. All the above problems, that have been headaches for us reactive streams library maintainers, can be naturally solved with the liberty of not having to synchronously return to the caller. Let's perhaps seal these problems as part of the history, rather than carrying them into the new Swift async world.

14 Likes

Big +1 as I already said in the pitch thread.
It's a perfect fit for RSocket (application protocol providing Reactive Streams semantics over the network). Backpressure is handled by the protocol through demand/subscription, similar to Combine. We therefore do not need any backpressure support from AsyncStream/AsyncThrowingStream.
That being said, we still need to a way to observe demand/consumption (i.e. calls to next()) but I think we can simply wrap AsyncStream in our own type which implements AsyncSequence. We can then observe calls to next() to send a demand to the server and then let AsyncStream.next() do the heavy lifting.

Thank you to all the authors for the hard work you all put into the implementation and proposal! I really appreciate it :slight_smile:

The naming difficulties were mentioned during the pitch phase.
Would "generator" be a valid alternative to "stream"?

Similar to SE-0300, could the throwing and non-throwing types be combined?

Instead of the nested Termination enum, could an optional failure type be used?
However, .cancelled would require Failure == Error or Failure == CancellationError.

public struct AsyncGenerator<Element, Failure: Error> {
  public struct Continuation: Sendable {
    public func yield(_ value: Element)
    public func finish(throwing error: Failure? = nil)
    public var onTermination: (@Sendable (Failure?) -> Void)? { get nonmutating set }
  }
}
  • What is your evaluation of the proposal?

I haven’t been following the pitch so this is the first time I’ve looked at it. I like it overall.

I do have one critique though. I think AsyncThrowingStream should support typed errors to match Result. I can see that this isn’t really possible without having typed errors in the language so I do understand why it is designed the way it is. That said, I think we will regret baking this design into the standard library if typed errors are added later (something I think would be a good idea). I don’t see this issue addressed anywhere in the proposal and think it should at least be updated with a rationale and possible paths forward should typed errors be added later.

  • Is the problem being addressed significant enough to warrant a change to Swift?

Yes

  • Does this proposal fit well with the feel and direction of Swift?

Yes

  • If you have used other languages or libraries with a similar feature, how do you feel that this proposal compares to those?

N/A

  • How much effort did you put into your review? A glance, a quick reading, or an in-depth study?

A quick read

3 Likes

+0.5, I empathize with the folks saying that this feature will be somewhat hobbled by lack of proper backpressure support.

The way I see it, there are three main cases where a simplified implementation of AsyncSequence would be useful:

  1. To bridge API you don't own which calls back in a context you cannot synchronize with. buyVegetables is an example of this from the proposal. If it is backed by a DispatchQueue, we cannot async onto that queue, only provide callbacks.
  2. To bridge API that you don't own, that calls back in a context that you can synchronize with. DispatchSource.makeSignalSource is an example of this, because you can provide your own DispatchQueue and async onto that.
  3. Bridging API you own, where you might want to be smart about how you deal with back pressure.

This proposal addresses item 1, and partially addresses item 2 with the caveat that it may end up using a suboptimal locking primitive to perform synchronization. It does nothing for case 3.

I recently wrote an implementation of AsyncSequence for NIO's ChannelInboundHandler (thread), and after thinking about it some more I believe this proposal can be generalized in a way that addresses all three cases. The basic idea would be to provide a few more configuration points. Namely, allow users to specify the synchronization mechanism to use and allow direct access to the buffer.
By default, we can provide the same locking behavior proposed here, but we can also allow users to implement a function to use the locking mechanism that makes sense for their use case. For DispatchSignalSource this would be DispatchQueue.async, while for NIO this would be EventLoop.execute.
The infrastructure can then use the locking mechanism to store continuation and react to new data becoming available, much like I do in my earlier example. By default this can have an infinite buffer, but more advanced use cases can inspect the buffer state like how many items are buffered, and whether the stream is currently being awaited (and a callback for when the stream moves from unawaited -> awaited).
Such an approach could also enable progressive improvement of a bridged API, so folks can improve a naive implementation by adding back pressure handling without ditching the AsyncStream implementation (and be sure they address some of the more nuanced issues of a full AsyncSequence implementation).

4 Likes

+1

  • What is your evaluation of the proposal?
    Perfectly fits in the domain i am working in (App development)

  • Is the problem being addressed significant enough to warrant a change to Swift?
    Yes

  • Does this proposal fit well with the feel and direction of Swift?
    Yes

  • If you have used other languages or libraries with a similar feature, how do you feel that this proposal compares to those?
    I coded a similar solution to bridge the gap from a callback based api to async/await semantics.

  • How much effort did you put into your review? A glance, a quick reading, or an in-depth study?
    Just a quick read, but the solution is almost identical to the one I implemented - so I spent a much longer time with the problem the proposal solves.

1 Like

+0.8 - I think it's an important problem to address and goes a long way towards solving it, with a few caveats.

In particular, thinking about how this would interact with a Combine Publisher. I think to allow using for await with a Publisher, I think I would want the subscription to be created in makeIterator() async and then cancelled if the loop was exited early. I think this would mean AsyncStream should adopt AsyncIteratorProtocol itself, not AsyncSequence.

I've used node.js Streams and js for await ... of / asyncIterator. In particular, for things like iterating through the lines of a file, it was conceptually easier for the Async Iterable (equivalent of AsyncSequence) to handle opening the file when creating the iterator and then ensuring it was closed at the end of iteration (whether because the end of the file was reached, or the iterating code decided to stop).

.Net's IObservable.ToAsyncEnumerable() has similar semantics.

Read the pitch threads and compared implementation to equivalents in other languages.

I think there are a few points that deserve a bit of response here to clear things up:

That isn't exactly true - this type itself does not handle back-pressure, instead it is intended that the protocol AsyncIteratorProtocol handles the concept of back-pressure - as @dnadoba pointed out.

Per unlimited buffering -

The buffer itself only buffers what has not yet been consumed and when a terminal is hit that buffer is no longer appended. So if there is a cancel, implicitly via deinit... e.g. someone no longer has a reference to the backing storage of the AsyncStream which can happen when break is used in the loop, or explicitly via Task.cancel the buffer will no longer accept values to be appended. This means that it is as much of an issue as backed-up pending async calls that might be in the executor. The executor itself is not controllable about it's buffer size of pending calls so that means that even in an async or actor based context it would mean that there is a hidden unlimited buffer there. AsyncStream allows control over that buffer size and drops the oldest values accordingly. To be clear this is the starting point and it would be reasonable (with the current type constraints) to add additional functionality to control that concept.

Per Ben's commentary about calling it generator:

That name definitely is pretty loaded - I think that any language level generators should interplay with AsyncStream in the future but camping on that name probably isn't a great idea.

This is an interesting concept, but how would it be able to enforce that? I guess the current initializer could be restricted to only permit where Failure == Error. I am not opposed to this change but coming from maintaining Combine (with the most deference to this highly debated topic) - I tend to find that typed errors are a real stumbling block for a lot of users.

To be super clear on this: Combine is not part of the Swift open source efforts.

That being said; a trivial approach with this API could allow for a quick and dirty adaption of a Publisher to an AsyncSequence via AsyncStream or AsyncThrowingStream by using .sink to yield values and the cancellable token to trigger in the onTerminate. That being said it could also very easily be conceived that since next is isomorphic to a demand of 1 it could be written a bit thinner instead of needing an intermediary type. These are just rumination of the utility of AsyncStream and by no means anything more.

3 Likes

As an additional note, AsyncStream isn't designed to support everything that could be provided as a sequence of elements — an adapter for an existing non-async sequence would not need to buffer, as it can use the iterator as its state and produce elements on demand.

Given that the most salient attribute of the proposed types may be their particular buffering behavior (in that you only choose these types if you need that kind of buffering), a naming alternative could be BufferedStream and ThrowingBufferedStream. To me that would discourage use in things that wouldn't need that behavior or overhead at all…

That would make a lot of sense IMO. Also some comment on the proposal with some guidance in regards to this would be appreciated, like mentioning the use of AsyncIteratorProtocol.

1 Like

That name only makes sense for the site APIs advertising these; the consumer of them probably does not care about that detail. Also it limits any sort of alternate construction method than the buffering continuation version - which the current implementation does permit potential future directions that do not require buffering.

I know it's a few days last the review deadline but it slipped past me with so much concurrency stuff happening... concurrently (?). I was asked to give this a review and drop some notes though so better late than after conclusions are posted...

This feedback was already shared with proposal authors and the core team - there's nothing new here. But for the sake of public record I thought it is fair to include the notes in the official review thread.


Overall type usefulness

:100: Yes, the type, and general pattern, is of great value and I have no doubt about its utility.

I'm very supportive of making this happen in general!

I have prior experience on the strong need for such types and can also see where we need it in Swift today. Unless we provide an implementation, there will be plenty of semi-broken ad-hoc implementations of the pattern floating around; It really makes sense to have one blessed impl of this.

Technically it could ship in a standalone package if the core team decided it needs to mature some more before pulling into stdlib. It does not really need language features to be implemented AFAICS.

default buffer behavior

I don't think it's a good idea to make this decision for the user, and definitely is picking .max as default buffer size not a good idea for most use cases.

Instead not default buffer size should be provided and users must decide what size to use.

The buffer size must be > 0.

yield / push / offer | back-pressure

This type is mostly for bridging non stream aware, non async, non back-pressurable sources to the world of async sequences. As such, what we can do in terms of back-pressure is limited, but crucial.

The offer / push operation, currently called yield currently returns nothing, this is bad as it makes it impossible for the sender to e.g. back off and try again. It is a naive form of back-pressure, but better than nothing.

The operation should instead return an enum signaling what the state of the stream is: e.g. enqueued, dropped, terminated. Some nice prior art to follow here exists in Akka Stream's Source.queue, details here QueueOfferResult:

case OfferResult { 
  case enqueued
  case dropped
  // maybe: case Failure(cause: Throwable) extends QueueCompletionResult // optional
  case terminated // or queueClosed or similar ("finished" I guess)
}

On the naming of yield: personally, I really don't like calling this yield... A yield operation is not something that one usually associates with something that can fail. This is not a generator, even though it'd like to pretend it is :wink:

"Cancellation handlers"

I understand what this is trying to offer, but I remain confused about how this would actually be used.

It also feels weird that the termination can be finished or cancelled. This goes against the grain of what reactive streams have established, and we in some ways also follow in Swift's Structured Concurrency: cancellation only flows "down", and completion flows "up" - it feels very weird to see cancelled in an "up" position.

I'm a little bit concerned that the termination handler is a property rather than just a .onTermination { ... } it seems this makes it hard to register multiple parties for listening for termination here?

Naming: sometimes the types use "finish" and sometimes "termination", it seems inconsistent. I would suggest sticking to "Completion" as "either error or successful". Then finish becomes complete() and fail(Error) and either of those is complete, as such then the type can be called Completed.

I also can't help but wonder how it would actually be used... the example only does some println which does not really show real world usage much. Won't we be forced into locking around things here? Or is the intent that the source would be contained in an actor, and from the cancellation handler we'd kick off async { self.actuallyCancelTheWork() } or something like that? Could you clarify this piece of the design?

Type name

I'm sure you've had enough of people complaining about the name, but I really feel very strongly about this. And given my past experience* building stream libraries I feel this is productive input.

I, personally, would very much object to calling this "AsyncStream" - this isn't "the stream" and this send a very wrong message to users of this type. It's not a type intended to be passed around a lot, and it's not the return type that should be used in APIs, an "some AsyncSequence<T>" (which we can't do today...) is what we'd really want OR some type-eraser.

It should likely not be surfaced in return types or in general APIs which intend to be ABI stable.

And calling it "stream" makes it sound like it should be. It just happens to be one of many popular ways to kick off a stream. Other approaches are "unfold" (see here: Source.unfold • Akka Documentation) which could be implemented in the future. Some name using Source, Origin, Builder (meh) or Generator (not quite) may be considered.

It could also be considered to have the type NOT conform to AsyncSequence but have a .sequence or some run() or similar that returns an async sequence -- this would help avoiding the mistake of surfacing this type by accident perhaps?


*Disclaimer: I worked on the original reactive streams specification, as well as akka streams which has this exact capability that this is modeling. There we where it is neatly named by what it offers, being a source to a stream: Source.queue.

15 Likes

Sorry for the late comment, but there's one thing I'd like to say/ask.

  • What happens to the iterator when the AsyncStream is cancelled? Does it
    1. drop buffered items and immediately start returning nil,
    2. stop accepting new items from continuation, but continue to serve the buffered items, or
    3. continue to accept new items and wait for cooperative termination from the AsyncSequence.init.build closure?

  • Back pressure would be a useful addition.
  • Dropping value when the buffer is full seems like a pretty confusing behaviour. It is very easy to misunderstand the implication of setting maxBufferedElement.
  • AsyncStream is a weird name for its functionality, though I don't have any suggestions.

Having investigated manually implementing an AsyncSequence for Alamofire’s stream requests and looked at AsyncStream yet-unmerged implementation, this functionality is definitely necessary, for both performance (I see you, _Deque) and correctness. However, I think there needs to be two versions of it.

  • Smart streams, which model a stream with pull semantics and can support things like backpressure. I’m not sure this can be generalized enough for use with the standard library, but it might be possible.
  • Dumb streams, which model a stream with push semantics. These streams are usually outside the client’s control, like most high level network streams. Alamofire’s DataStreamRequest and WebSocketRequest have no ability to tell the server to slow down (technically, a web socket server could do this, but that’s very rare).

It certainly seems like trying to fit all of these capabilities into a single type is doomed to fail in some regard. In fact, it seems likely that each reactive library should probably have its own, dedicated adapter type for AsyncSequence which can exactly match the semantics of the library’s stream model.

In Alamofire’s case it’s really only the buffering and cancellation behaviors we need. In fact, I think it might be a good idea for the buffering to be more customizable than simply the number of items and instead something that can express a custom predicate. Streams of things like web socket messages lend themselves to being measured by count, but raw Data streams really need to be measured by total size.

To better differentiate these different types of streams, perhaps a more specific name would be better? Perhaps AsyncBufferingStream? I think the name should be a product of the type’s specialization, not a representation of its general idea.

3 Likes

I'm super late to the party, can someone please pin point me to the rational on why AsyncThrowingStream has no Failure: Error generic type parameter?

We already have some precedence to that:

public struct ThrowingTaskGroup<ChildTaskResult, Failure> where Failure : Error { ... }

I also think that a generic type parameter for the Failure would keep the door open for convenient transitions between APIs that fully support explicit error types such as in the Combine framework, or if Swift decides to add typed throws.

4 Likes

Unfortunately I haven't had time to review all the concurrency proposals in depth. But I agree with @DevAndArtist here - this should probably have a typed error. I certainly hope we're not baking design decisions into the standard library that we might regret later if Swift eventually gets typed errors.

5 Likes

The current proposal as it stands is intended to be a back pressure system (AsyncSequence) intermediary to systems that do not offer back pressure. If a system is possible to express back pressure then that should be immediately adaptable to an AsyncSequence because the concept of back pressure is each application of the async function next on the iterator is a demand of 1.

That being said; it is perfectly reasonable for us to add an initializer that unfolds. e.g. init(unfolding: @Sendable @escaping () async -> Element? Which then does no buffering. However that makes the type potentially an "eraser" type which comes at it's own baggage for optimizations and such. It seems that this is a misunderstanding that keeps being brought up. Perhaps the eraser type is worth it? I would like to get the opinion of other folks maintaining the standard library/Concurrency library to weigh in on this part. @Ben_Cohen or @Douglas_Gregor might be opinionated about this and able to offer some constructive feedback here.

This is something that I initially explored however the reason that we avoided that is because it ends up needing to make call-outs to code within the critical region. I am not saying this can't ever happen in the future but as it currently stands that is a minefield to make generalized for use by a wide audience - I think perhaps in those advanced cases it might be reasonable to have a more targeted approach for now.

I understand where you are coming from w/ the name, i'm not sure that is fully the right name here. Is it meaningful for the consumer of the type to understand that name? It would infer if the name is buffering that there would be control by the consumer w.r.t. buffering. I am not sure that really captures what is going on here for the current initializers (but referencing my other commentary per the construction via unfolding)

The reasoning was that there was no way in the language to express this; the construction of AsyncThrowingStream would need to constrain where the Failure was always Error since the throw would be un-typed. This would mean that the constructor would need to be restricted as such:

 public init(
    _ elementType: Element.Type,
    maxBufferedElements limit: Int = .max,
    _ build: (Continuation) -> Void
  ) where Failure == Error

Im not sure what that gains us except a future in which that might be used? The withThrowing* APIs that currently exist have constraints similar that restrict it to Error. So I guess it would be somewhat useful - but honestly even doing so won't solve any issues with interfacing to Combine as you might hope from my exploration because AsyncSequence itself has no generic effects (because the language lacks generic effects) that means that conditional conformance wouldn't be able to be applied twice.

After talking with @Douglas_Gregor a bit more I think we are in agreement this is a sensible approach here. I will have this updated on the proposal to reflect the following:

public struct AsyncThrowingStream<Element, Failure: Error> {
  public struct Continuation: Sendable {
    public enum Termination {
      case finished(Failure?)
      case cancelled
    }
    
    /// * See AsyncStream.Continuation.yield(_:) *
    public func yield(_ value: Element)

    /// Resume the task awaiting the next iteration point with a terminal state.
    /// If error is nil, this is a completion with out error. If error is not 
    /// nil, then the error is thrown. Both of these states indicate the 
    /// end of iteration.
    ///
    /// - Parameter error: The error to throw or nil to signify termination.
    ///
    /// Calling this function more than once is idempotent. All values received
    /// from the iterator after it is finished and after the buffer is exhausted 
    /// are nil.
    public func finish(throwing error: Failure? = nil)

    /// * See AsyncStream.Continuation.onTermination *
    public var onTermination: (@Sendable (Termination) -> Void)? { get nonmutating set }
  }

  /// * See AsyncStream.init *
  public init(
    _ elementType: Element.Type,
    maxBufferedElements limit: Int = .max,
    _ build: (Continuation) -> Void
  ) where Failure == Error
}
6 Likes