SE-0314: AsyncStream and AsyncThrowingStream

That name only makes sense for the site APIs advertising these; the consumer of them probably does not care about that detail. Also it limits any sort of alternate construction method than the buffering continuation version - which the current implementation does permit potential future directions that do not require buffering.

I know it's a few days last the review deadline but it slipped past me with so much concurrency stuff happening... concurrently (?). I was asked to give this a review and drop some notes though so better late than after conclusions are posted...

This feedback was already shared with proposal authors and the core team - there's nothing new here. But for the sake of public record I thought it is fair to include the notes in the official review thread.


Overall type usefulness

:100: Yes, the type, and general pattern, is of great value and I have no doubt about its utility.

I'm very supportive of making this happen in general!

I have prior experience on the strong need for such types and can also see where we need it in Swift today. Unless we provide an implementation, there will be plenty of semi-broken ad-hoc implementations of the pattern floating around; It really makes sense to have one blessed impl of this.

Technically it could ship in a standalone package if the core team decided it needs to mature some more before pulling into stdlib. It does not really need language features to be implemented AFAICS.

default buffer behavior

I don't think it's a good idea to make this decision for the user, and definitely is picking .max as default buffer size not a good idea for most use cases.

Instead not default buffer size should be provided and users must decide what size to use.

The buffer size must be > 0.

yield / push / offer | back-pressure

This type is mostly for bridging non stream aware, non async, non back-pressurable sources to the world of async sequences. As such, what we can do in terms of back-pressure is limited, but crucial.

The offer / push operation, currently called yield currently returns nothing, this is bad as it makes it impossible for the sender to e.g. back off and try again. It is a naive form of back-pressure, but better than nothing.

The operation should instead return an enum signaling what the state of the stream is: e.g. enqueued, dropped, terminated. Some nice prior art to follow here exists in Akka Stream's Source.queue, details here QueueOfferResult:

case OfferResult { 
  case enqueued
  case dropped
  // maybe: case Failure(cause: Throwable) extends QueueCompletionResult // optional
  case terminated // or queueClosed or similar ("finished" I guess)
}

On the naming of yield: personally, I really don't like calling this yield... A yield operation is not something that one usually associates with something that can fail. This is not a generator, even though it'd like to pretend it is :wink:

"Cancellation handlers"

I understand what this is trying to offer, but I remain confused about how this would actually be used.

It also feels weird that the termination can be finished or cancelled. This goes against the grain of what reactive streams have established, and we in some ways also follow in Swift's Structured Concurrency: cancellation only flows "down", and completion flows "up" - it feels very weird to see cancelled in an "up" position.

I'm a little bit concerned that the termination handler is a property rather than just a .onTermination { ... } it seems this makes it hard to register multiple parties for listening for termination here?

Naming: sometimes the types use "finish" and sometimes "termination", it seems inconsistent. I would suggest sticking to "Completion" as "either error or successful". Then finish becomes complete() and fail(Error) and either of those is complete, as such then the type can be called Completed.

I also can't help but wonder how it would actually be used... the example only does some println which does not really show real world usage much. Won't we be forced into locking around things here? Or is the intent that the source would be contained in an actor, and from the cancellation handler we'd kick off async { self.actuallyCancelTheWork() } or something like that? Could you clarify this piece of the design?

Type name

I'm sure you've had enough of people complaining about the name, but I really feel very strongly about this. And given my past experience* building stream libraries I feel this is productive input.

I, personally, would very much object to calling this "AsyncStream" - this isn't "the stream" and this send a very wrong message to users of this type. It's not a type intended to be passed around a lot, and it's not the return type that should be used in APIs, an "some AsyncSequence<T>" (which we can't do today...) is what we'd really want OR some type-eraser.

It should likely not be surfaced in return types or in general APIs which intend to be ABI stable.

And calling it "stream" makes it sound like it should be. It just happens to be one of many popular ways to kick off a stream. Other approaches are "unfold" (see here: Source.unfold • Akka Documentation) which could be implemented in the future. Some name using Source, Origin, Builder (meh) or Generator (not quite) may be considered.

It could also be considered to have the type NOT conform to AsyncSequence but have a .sequence or some run() or similar that returns an async sequence -- this would help avoiding the mistake of surfacing this type by accident perhaps?


*Disclaimer: I worked on the original reactive streams specification, as well as akka streams which has this exact capability that this is modeling. There we where it is neatly named by what it offers, being a source to a stream: Source.queue.

14 Likes

Sorry for the late comment, but there's one thing I'd like to say/ask.

  • What happens to the iterator when the AsyncStream is cancelled? Does it
    1. drop buffered items and immediately start returning nil,
    2. stop accepting new items from continuation, but continue to serve the buffered items, or
    3. continue to accept new items and wait for cooperative termination from the AsyncSequence.init.build closure?

  • Back pressure would be a useful addition.
  • Dropping value when the buffer is full seems like a pretty confusing behaviour. It is very easy to misunderstand the implication of setting maxBufferedElement.
  • AsyncStream is a weird name for its functionality, though I don't have any suggestions.

Having investigated manually implementing an AsyncSequence for Alamofire’s stream requests and looked at AsyncStream yet-unmerged implementation, this functionality is definitely necessary, for both performance (I see you, _Deque) and correctness. However, I think there needs to be two versions of it.

  • Smart streams, which model a stream with pull semantics and can support things like backpressure. I’m not sure this can be generalized enough for use with the standard library, but it might be possible.
  • Dumb streams, which model a stream with push semantics. These streams are usually outside the client’s control, like most high level network streams. Alamofire’s DataStreamRequest and WebSocketRequest have no ability to tell the server to slow down (technically, a web socket server could do this, but that’s very rare).

It certainly seems like trying to fit all of these capabilities into a single type is doomed to fail in some regard. In fact, it seems likely that each reactive library should probably have its own, dedicated adapter type for AsyncSequence which can exactly match the semantics of the library’s stream model.

In Alamofire’s case it’s really only the buffering and cancellation behaviors we need. In fact, I think it might be a good idea for the buffering to be more customizable than simply the number of items and instead something that can express a custom predicate. Streams of things like web socket messages lend themselves to being measured by count, but raw Data streams really need to be measured by total size.

To better differentiate these different types of streams, perhaps a more specific name would be better? Perhaps AsyncBufferingStream? I think the name should be a product of the type’s specialization, not a representation of its general idea.

3 Likes

I'm super late to the party, can someone please pin point me to the rational on why AsyncThrowingStream has no Failure: Error generic type parameter?

We already have some precedence to that:

public struct ThrowingTaskGroup<ChildTaskResult, Failure> where Failure : Error { ... }

I also think that a generic type parameter for the Failure would keep the door open for convenient transitions between APIs that fully support explicit error types such as in the Combine framework, or if Swift decides to add typed throws.

4 Likes

Unfortunately I haven't had time to review all the concurrency proposals in depth. But I agree with @DevAndArtist here - this should probably have a typed error. I certainly hope we're not baking design decisions into the standard library that we might regret later if Swift eventually gets typed errors.

5 Likes

The current proposal as it stands is intended to be a back pressure system (AsyncSequence) intermediary to systems that do not offer back pressure. If a system is possible to express back pressure then that should be immediately adaptable to an AsyncSequence because the concept of back pressure is each application of the async function next on the iterator is a demand of 1.

That being said; it is perfectly reasonable for us to add an initializer that unfolds. e.g. init(unfolding: @Sendable @escaping () async -> Element? Which then does no buffering. However that makes the type potentially an "eraser" type which comes at it's own baggage for optimizations and such. It seems that this is a misunderstanding that keeps being brought up. Perhaps the eraser type is worth it? I would like to get the opinion of other folks maintaining the standard library/Concurrency library to weigh in on this part. @Ben_Cohen or @Douglas_Gregor might be opinionated about this and able to offer some constructive feedback here.

This is something that I initially explored however the reason that we avoided that is because it ends up needing to make call-outs to code within the critical region. I am not saying this can't ever happen in the future but as it currently stands that is a minefield to make generalized for use by a wide audience - I think perhaps in those advanced cases it might be reasonable to have a more targeted approach for now.

I understand where you are coming from w/ the name, i'm not sure that is fully the right name here. Is it meaningful for the consumer of the type to understand that name? It would infer if the name is buffering that there would be control by the consumer w.r.t. buffering. I am not sure that really captures what is going on here for the current initializers (but referencing my other commentary per the construction via unfolding)

The reasoning was that there was no way in the language to express this; the construction of AsyncThrowingStream would need to constrain where the Failure was always Error since the throw would be un-typed. This would mean that the constructor would need to be restricted as such:

 public init(
    _ elementType: Element.Type,
    maxBufferedElements limit: Int = .max,
    _ build: (Continuation) -> Void
  ) where Failure == Error

Im not sure what that gains us except a future in which that might be used? The withThrowing* APIs that currently exist have constraints similar that restrict it to Error. So I guess it would be somewhat useful - but honestly even doing so won't solve any issues with interfacing to Combine as you might hope from my exploration because AsyncSequence itself has no generic effects (because the language lacks generic effects) that means that conditional conformance wouldn't be able to be applied twice.

After talking with @Douglas_Gregor a bit more I think we are in agreement this is a sensible approach here. I will have this updated on the proposal to reflect the following:

public struct AsyncThrowingStream<Element, Failure: Error> {
  public struct Continuation: Sendable {
    public enum Termination {
      case finished(Failure?)
      case cancelled
    }
    
    /// * See AsyncStream.Continuation.yield(_:) *
    public func yield(_ value: Element)

    /// Resume the task awaiting the next iteration point with a terminal state.
    /// If error is nil, this is a completion with out error. If error is not 
    /// nil, then the error is thrown. Both of these states indicate the 
    /// end of iteration.
    ///
    /// - Parameter error: The error to throw or nil to signify termination.
    ///
    /// Calling this function more than once is idempotent. All values received
    /// from the iterator after it is finished and after the buffer is exhausted 
    /// are nil.
    public func finish(throwing error: Failure? = nil)

    /// * See AsyncStream.Continuation.onTermination *
    public var onTermination: (@Sendable (Termination) -> Void)? { get nonmutating set }
  }

  /// * See AsyncStream.init *
  public init(
    _ elementType: Element.Type,
    maxBufferedElements limit: Int = .max,
    _ build: (Continuation) -> Void
  ) where Failure == Error
}
6 Likes

Glad we could sort out this potentially critical problem in time. I think where Failure == Error is just a patch for another missing feature, even the restriction on withThrowingTaskGroup it‘s not what we actually need here. Unless the types become @frozen until we get typed throws, I hope that in the future we can add a non restricted init and also backport it via something like @_alwaysImitsIntoClient.

The actually missing feature here is a “default generic type parameter“. For example the type should really be AsyncThrowingStream<Element, Failure: Error = Error>. That wouldn‘t require the limitation on the ìnit and also won‘t require any additional overloads in the future. Unfortunately Swift does not have this feature yet.

1 Like

I‘m really tired and I might talk nonsense, but since we got a generalized version of the stream type, is there any need for an explicit AsyncStream type? Can‘t it be a type alias?

typealias AsyncStream<Element> = AsyncThrowingStream<Element, Never>

Can‘t the compiler not already guarantee that such type won‘t ever throw an error because of Never?

This would also mean that the restricted init from the previous message should move into extension AsyncThrowingStream where Failure == Error while probably also introducing another extension AsyncThrowingStream where Failure == Never.


A bit off-topic, but this also leads me to the question why we not simply have:

typealias TaskGroup<ChildTaskResult> = ThrowingTaskGroup<ChildTaskResult, Never>

Sadly no, that would require a double conditional conformance of the iterator.

1 Like

Off-topic again, but smells like another missing generic feature. :crossed_fingers: we can move the whole generics topic forwards for Swift 6.

3 Likes

Correct, the missing feature is generic effects (e.g. making a generic decision on if a function throws or type via the conformance throws)

2 Likes

After using AsyncStream and AsyncSequence a bit, I was wondering if it makes sense to introduce an AnyAsyncSequence type similar to the AnyPublisher of Combine. A common pattern in our existing code base is something like this:

protocol UserService: AnyObject {
    var userStream: AnyPublisher<User?, Never> // replace with AnyAsyncSequence<User?, Never> if possible
}

@Philippe_Hausler Would adding such a type into the standard library make sense?

Perhaps, but I think the any form would really need generalized effects to work properly. Since no where-clause today could emit the non throwing eraser. My hopes are that the opaque types work will lead us to a more general and efficient solution for erasing things with effects. AsyncStream however probably isn’t that type fully (granted it could partially serve as that).

1 Like

(Sorry, I keep hitting the wrong key today; meant to change tab)

Overall +1 to the idea. I think it's a really valuable addition, and it's good to finally bring some kind of reactive programming to the standard library. I have some questions and suggestions, though:

  1. I agree with @ktoso about the name yield. yield is already a language keyword (for _modify), and would be great for non-async generators. Might I suggest send as an alternative?

  2. Speaking of sending, it's a little strange that the Element type doesn't need to be Sendable. The stream's task is able to send values to its consumer, and they are possibly running in parallel, so wouldn't it be easy to introduce a data race by sending a non-Sendable class instance across?

As with any sequence, iterating over an AsyncStream multiple times, or creating multiple iterators and iterating over them separately, may produce an unexpected series of values. Neither AsyncStream nor its iterator are @Sendable types, and concurrent iteration is considered a programmer error.

  1. Sequence is a protocol, so it's more understable that it omits specifying what happens if you use it outside of spec (also, it's the worst part about that protocol, and the source of countless bugs). I'd hope for a little more from AsyncStream, being a concrete type. What does happen if you iterate an AsyncStream multiple times? A runtime error? An empty sequence?

  2. I wonder if it would be worth adding convenience methods on the continuation to send/yield all values from another AsyncStream/Sequence. You could just for await them and forward all the values, but I think having a built-in convenience function for including the elements from another stream/sequence is at least as valuable as one for yielding Void, which is included.

    Also, it may be possible to avoid repeated buffering if an AsyncStream is being consumed by another AsyncStream.

There has been discussion in the past about a model for typed throws where nonthrowing functions are equivalent to throws Never. Wouldn’t this approach also make it possible without having fully generalized effects?

Here is the post that you've just mentioned (I have it bookmarked):

That is correct, it will need to be Sendable but this suffers the same enforcement problem that TaskGroup hits - many adopters are yet to be Sendable. @Douglas_Gregor has indicated that we can address this issue when Sendable is actually enforced. I think from a details standpoint we really should require all elements of all AsyncSequences to be Sendable. The conformance here would just fall-out when that requirement is implemented.

It is an immediately terminal async sequence in that case.

That can't be directly done; since there is no way to enforce if it is a throwing case or not. Doing so would be a reasonable extension when we gain generalized effects.

1 Like

I did some exploration and took the parts about some of this feedback very seriously on how they can be implemented and updated the proposal with some additional explanations, better examples, and incorporated the feedback in this PR of changes to the proposal [SE-0314] `AsyncStream` and `AsyncThrowingStream` Updates by phausler · Pull Request #1389 · apple/swift-evolution · GitHub

5 Likes
Terms of Service

Privacy Policy

Cookie Policy