SE-0314 (Second review): AsyncStream and AsyncThrowingStream

Hello, Swift community

The second review of SE-0314: AsyncStream and AsyncThrowingStream begins now and runs through June 28, 2021. The first review received a lot of very useful feedback. In response, the authors have made several changes to the proposal for the second review, summarized here:

  • added YieldResult to express the action of yielding’s impact, either something is enqueued, dropped or the continuation is already terminated
  • added init(unfolding: @escaping () async -> Element?) to offer an initializer for unfolding to handle back-pressure based APIs.
  • made AsyncThrowingStream generic on Failure but the initializers only afford for creation where Failure == Error
  • removed the example of DispatchSource signals since the other DispatchSource types might be actively harmful to use in any async context
  • initialization now takes a buffering policy to both restrict the buffer size as well as configure how elements are dropped

Reviews are an important part of the Swift evolution process. All review feedback should be either on this forum thread or, if you would like to keep your feedback private, directly to the review manager. If you do email me directly, please put "SE-0314" somewhere in the subject line.

What goes into a review?

The goal of the review process is to improve the proposal under review through constructive criticism and, eventually, determine the direction of Swift. When writing your review, here are some questions you might want to answer in your review:

  • What is your evaluation of the proposal?
  • Is the problem being addressed significant enough to warrant a change to Swift?
  • Does this proposal fit well with the feel and direction of Swift?
  • If you have used other languages or libraries with a similar feature, how do you feel that this proposal compares to those?
  • How much effort did you put into your review? A glance, a quick reading, or an in-depth study?

More information about the Swift evolution process is available at:

https://github.com/apple/swift-evolution/blob/master/process.md

Thank you for participating in improving Swift!

Doug

11 Likes

+1 from me. This remains a valuable addition to the language to adapt more code to the Swift Concurrency Model (as it was in the first review). I especially appreciate the precise control over buffering behavior.

Note for authors: I believe the “Buffering Values” section needs updated for the new buffering policy and the “Convenience Methods” section could gain value from a description of the new unfolding init.

1 Like

This proposal is based (depends) on AsyncSequence proposal. But that proposal also depends on rethrowing protocol conformance, which is just a pitch that haven't become an accepted proposal yet (or I may be wrong). Why do we have a formal proposal accepted with dependency on a pitch?

I've quickly read through the proposal (and have worked with asynchronous data streams in a previous life...) - I have two questions to begin with:

  1. When it comes to BufferingPolicy - has "conflation/merging" been considered as an additional policy? It's a different way to have some bound on the queue length, but would probably require some protocol for defining the merge operation on a type. We've found it a useful behaviour for data streams in general which can be more useful than just dropping data, while still keeping some bounds on the queue growth.

  2. We've found it useful to also be able to make a distinction for such data streams when an initial snapshot of data is done. This allows for use cases where one opens an async stream, receives all items initial state - then a snapshotDone - followed by 'live updates" to items. There doesn't seem to be any way to express that, has it been considered?

To be able to express both conflation/merging of items as well as being able to signal when an initial data set has been received would be useful IMHO.

2 Likes

I agree. Especially with the first part. It would be awesome with e.g. something like this:

    /// A strategy that handles exhaustion of a buffer’s capacity.
    public enum BufferingPolicy {
      case unbounded

      /// When the buffer is full, discard the newly received element.
      /// This enforces keeping the specified amount of oldest values.
      case bufferingOldest(Int)
      
      /// When the buffer is full, discard the oldest element in the buffer.
      /// This enforces keeping the specified amount of newest values.
      case bufferingNewest(Int)
      
      /// Only buffer a single element, updating it when new elements are received.
      case coalescing((Element, Element) -> Element)
    }

Or even case coalescing((Input, Output) -> Output)...

More options for buffering would be nice, but the ability to simply coalesce received elements seems very useful for a lot of use cases.

I guess you could even replace BufferingPolicy with a Buffer protocol or similar :slight_smile:

Hmm. I guess it's possible to create an AsyncSequence wrapper performing the coalescing... :smiley:

This is a good question: should streams intrinsically buffer vs combining :wink: streams with buffering “operators”?

2 Likes

Adding more fine grained control for buffering is 100% a valid request but I think it might be best served with its own type to add a more general “operator” to be applied to any AsyncSequence.

Generally the risk that we have here is that this type has buffering to account for the back pressure adaptation, and it has a distinct benefit from being as simple as possible. Doing so will allow users to safely construct streams of events from delegates or callbacks with low levels of friction.

It seems to me that we should keep some level of isolation of concerns to avoid scope creep of this type and encompass finer grained control over buffering in a progression to a type with more in depth control. For example: one idea I have tinkered with is to have a type that takes a protocol to manage buffering behavior - that way folks could do advanced buffering behavior that goes beyond what this type is capable of doing. The best place for that type of thing in my opinion is to have it as its own type (similar to map and friends) that adapts an AsyncSequence to an AsyncBufferedSequence.

There are numerous other advancements that have already been brought up like debounce and throttle that a fully featured buffer type could fit in quite nicely with.

As it currently stands the behaviors of AsyncStream are predictable and perhaps as complex as I think they should be.

To the suggestions so far - The number of dropped elements is either 1 or 0 so that indication is not needed. The terminal state cannot (without calling out in a critical context) be determined that it is an error state when a value is yielded, so that indicator is not reasonable without using some sort of intermediary actor delegation.

The additional suggestions are merit worthy for a more advanced buffer type in my opinion. I will record these ideas and add them to the bucket of other good concepts to follow up with.

6 Likes

Thanks, makes sense to avoid feature creep - only open question would be if such future extension direction impacts the composition of API.

Also, to clarify, snapshotDone is useful to reflect eg UI state that the full initial state has been received from a server or to trigger application logic when such a state is reached.

My hope is to be able to use async sequences as a solution for API for such use cases - would be very elegant vs the usual callback based ones.

I don't think it would inhibit that one bit. Another type representing the general application of transformation of an existing AsyncSequence into a buffered AsyncSequence would be purely additive to anything we have so far. AsyncStream would be perfectly reasonable to use in addition to any new buffering type and would likely steer the direction of that to be a more focused type on the behaviors of buffering.

2 Likes

We (mostly me) made a mistake; it didn't get on my concurrency proposals gist and got lost in the shuffle.

Doug

Also, as @ktoso points out in another thread shouldn't these be called "AsyncSources" rather than "AsyncStreams" ?

3 Likes

While I love the idea of having potentially infinite async streams, I still have one major issue with the current design. There is no mention in the proposal on how to preserve a strong reference to types that you want to wrap.

There is this one example (but with a typo):


extension QuakeMonitor {
  static var quakes: AsyncStream<Quake> {
    AsyncStream { continuation in
      let monitor = QuakeMonitor()
      monitor.quakeHandler { quake in
        continuation.yield(quake)
      }
      // should be `continuation` not `monitor`
      monitor.onTermination = { _ in 
        monitor.stopMonitoring // another typo, missing `()`
      }
      monitor.startMonitoring()
    }
  }
}

It took me a while to understand that it is onTermination that captures a strong reference to the monitor instance so that does not get deallocated.

How do we handle infinite streams which may not have something like a stop method? Should we capture the instance in this fashion?

continuation.onTermination = { _ in
  _ = myInstance // that seems like a strange workaround
}

Another typo: Search for AsyncSeries. I think there are more typos that I‘ve seen yesterday, but I can‘t find them atm.


Coming from frameworks like RxSwift and Combine, I would wish for some examples of hot and cold AsyncStream‘s.

2 Likes

Sorry for being late to the party again, I wish I had some more time during the initial pitch.


  • Why is the type called Continuation.Termination instead of Continuation.Completion (with name precedent to Subscribers.Completion from Combine)?

  • I feel like the consistent way would be to introduce this type:

    public enum Completion {
      case finished
      case cancelled
      case failure(Failure)
    }
    

    It would align great with the mentioned type from Combine. That also means that onTermination would become onCompletion.

  • It feels a bit strange that you can both yield(with:) a Result containing a potential error and finish(throwing:). I think the user should only be using one of these to complete with an error. It seems like yield(with:) is a convenience method where you want to stream errors without actually failing, but for that purpose you should use AsyncThrowingStream<Result<Element, Error>, Never> instead (or AsyncStream<Result<Element, Error>> for short).

  • I still struggle to understand why we need two types? I mean, I get that AsyncSequence would require two overlapping conformances which are not allowed, because we don't have typed throws. However there is a decent way to handle this in the meantime, we just have to teach the compiler a bit:

    extension AsyncThrowingStream: AsyncSequence {
      public struct Iterator: AsyncIteratorProtocol {
        let produce: () async -> Result<Element, Failure>?
    
        public mutating func next() async -> Result<Element, Failure>? {
          return try await produce()
        }
      }
      ...
    }
    
    public typealias AsyncStream<Element> = AsyncThrowingStream<Element, Never>
    
    // somewhere else
    
    // automatically promote `Result<Element, Failure>` to allow
    // the following syntax
    for try await value in asyncThrowingStream { ... }
    
    // automatically promote `Result<Element, Never>` to allow
    // the following syntax
    for await value in asyncStream { ... }
    

    IIRC a for in loop is already sugar code, so why can't we teach it to add a .get() call when you add a try on a sequence where the iterators element is a Result? It can also error out when you try adding try but the Result's Failure type equals Never.


Personal note: It feels like 'typed throws' are almost inevitable, and when we get them, I strongly feel that like Optional promotion, we should also promote Result where logically possible.


  • I think the ultimate goal for the types introduced in this proposal should be the ability to mimic most if not all Publisher types from Combine (maybe even Subscriber types).
    To name a few, I would expect being able to re-build types such as the following once by using AsyncThrowingStream: Just, Empty, Fail, CurrentValueSubject, PassthroughSubject, etc.
    Being able to move from reactive framework into the native async world and vice verse is what I would really want to achieve by utilizing these types.
6 Likes

The reason has to do with the conditional conformance to AsyncSequence for the iterator. We need two types because a type cannot conditionally conform twice for the sake of the iterator throwing or not.

e.g.

extension AsyncStream: AsyncSequence where Failure == Never {
	func makeAsyncIterator() -> NonThrowingIterator { ... }
}

extension AsyncStream: AsyncSequence  {
	func makeAsyncIterator() -> ThrowingIterator { ... }
}

This sadly does not work; rightfully so because it effectively means that there is ambiguity for which implementation it can use.

@Philippe_Hausler I'm not following the reason for two iterator types. The nested iterator types does capture the Failure type, which when it's Never, it would eliminate the error if we used Result<Element, Failure> as the iterators Element type instead. Is it not possible to extend the for in syntax to automatically transform Result into the throwing syntax that uses try by adding .get() to the sugar code?

let array: [Result<Int, Error>] = ...
for try value in array { ... } // NEW
for case .success(let value) in array { ... }

// the former would be sugar for
var iterator = array.makeIterator()
while let value = try iterator.next()?.get() {
  ...
}

I am not sure that will work for the rethrows cases though. Because your suggestion is inferring that we re-do all of the implementations of the other AsyncSequences added and re-do the compiler implementation.

Hmm. Now that you've mentioned the other sequences. Technically since we fixed the missing generic Failure type in this proposal, I kinda would assume that the other sequences (e.g AsyncThrowingFilterSequence) should also all have the Failure generic type parameter. It feels like an oversight in the flood of the whole giant async feature this year.

To be clear, I'm not trying to be rude or force more work on you and the team working on all these things, I'm just sharing my personal view point on how I as a consumer of these APIs could imagine those to function. I personally prefer as much generalization over generic types as possible.

3 Likes

I did a quick and dirty experiment, while I'm not sure about the effects of the @rethrows attribute, I had to technically adjust AsyncIteratorProtocol, that's why I used a custom _AsyncSequence which used my custom _AsyncIteratorProtocol. And I could fairly easily rewrite AsyncThrowingMapSequence.

You might want to look at it and think if it would be something that Swift might want to do now (if there's still a bit of time left):

// Only used because we needed a change made in `_AsyncIteratorProtocol`.
public protocol _AsyncSequence {
  associatedtype AsyncIterator: _AsyncIteratorProtocol
  associatedtype Element where Self.Element == Self.AsyncIterator.Element

  func makeAsyncIterator() -> Self.AsyncIterator
}

public protocol _AsyncIteratorProtocol {
  associatedtype Element
  associatedtype Failure: Error // NEW

  // NEW return type
  mutating func next() async -> Result<Self.Element, Failure>?
}


extension _AsyncSequence {
  @inlinable
  public __consuming func map<Transformed>(
    _ transform: @escaping (Element) async throws -> Transformed
  ) -> AsyncThrowingMapSequence<Self, Transformed, Error> {

    AsyncThrowingMapSequence(self) { element in
      // wrapping the value and failure into the `Result`
      do {
        return await .success(try transform(element))
      } catch {
        return .failure(error)
      }
    }
  }
}

public struct AsyncThrowingMapSequence<
  Base: _AsyncSequence,
  Transformed,
  Failure
> where Base.AsyncIterator.Failure == Failure {
  // Make sure we align the failure types. This is also true in `Combine`
  // and we would need a separate sequence to change / map the `Failure` type.
  @usableFromInline
  let base: Base

  @usableFromInline
  // Use a result type
  let transform: (Base.Element) async -> Result<Transformed, Failure>

  @usableFromInline
  init(
    _ base: Base,
    transform: @escaping (Base.Element) async -> Result<Transformed, Failure>
  ) {
    self.base = base
    self.transform = transform
  }
}

extension AsyncThrowingMapSequence: _AsyncSequence {

  public typealias Element = Transformed
  /// The type of iterator that produces elements of the sequence.
  public typealias AsyncIterator = Iterator

  /// The iterator that produces elements of the map sequence.
  public struct Iterator: _AsyncIteratorProtocol {
    @usableFromInline
    var baseIterator: Base.AsyncIterator

    @usableFromInline
    let transform: (Base.Element) async -> Result<Transformed, Failure>

    @usableFromInline
    var finished = false

    @usableFromInline
    init(
      _ baseIterator: Base.AsyncIterator,
      transform: @escaping (Base.Element) async -> Result<Transformed, Failure>
    ) {
      self.baseIterator = baseIterator
      self.transform = transform
    }

    @inlinable
    public mutating func next() async -> Result<Transformed, Failure>? {
      guard
        !finished,
        let result = await baseIterator.next()
      else {
        return nil
      }
      switch result {
      case .success(let element):
        return await transform(element)
      case .failure(let error):
        finished = true
        return .failure(error)
      }
    }
  }

  @inlinable
  public __consuming func makeAsyncIterator() -> Iterator {
    return Iterator(base.makeAsyncIterator(), transform: transform)
  }
}

If and when we get typed throws. All we would need to do, will be a simple overload that will already use an existing and compatible generic type.

extension _AsyncSequence {
  @_alwaysEmitIntoClient // back deployment ?? 🙂
  @inlinable
  public __consuming func map<Transformed, Failure: Error>(
    _ transform: @escaping (Element) async throws Failure -> Transformed
  ) -> AsyncThrowingMapSequence<Self, Transformed, Failure> { ... }
}

That seems like a reasonable answer here. Use

continuation.onTermination = { [myInstance] in
}

to make it very explicit that our goal is to capture myInstance.

Doug

1 Like