When can we move `AsyncSequence` forward?

I have been heavily invested in implementing many parts of our new application logic using the new and shiny concurrency APIs and well I cannot express enough my current disappointment with it. In particular, the work with AsyncSequence is a very painful experience on our side.

Some may say that AsyncSequence is the Swift's open sourced Combine. While this is an interesting point of view, it's nothing more. AsyncSequence lacks a Failure: Error type compared to a Publisher. Sure there is something like @rethrows but it also does not cut in so many cases and in my honest opinion is just a quick patch to cover a few basic transformation cases.

Here is a list of some pain points I personally run into:

  • There is common return type at this moment (I personally use AsyncStream and AsyncThrowingStream), but I'd rather would use any / some AsyncSequence<Value, Failure>.
  • It's not possible to write extensions for non-failing async sequences.
  • Using transformation operators like map and others on non-failing async sequences has no easy way to wrap the sequence into an AsyncStream. It requires the user to:
    • Spawn a wrapping AsyncStream
    • Escape the AsyncStream.Continuation
    • Spawn a Task inside the build closure to wrap the transformed sequence
    • Properly configure all continuation events
    • And lastly to configure task cancellation when the wrapping stream gets cancelled
  • There is a huge amount of operators that is missing compared to Combine.
  • Task itself could conform to AsyncSequence if there was a Failure associated type as it could mimic Empty or Just types from Combine.
  • Writing own AsyncSequence types seems to be not trivial in many cases. (It could be me, but I have a feeling that this work requires the developer to guarantee thread safety manually, which isn't everyone's expertise.).
  • Combine can help here and there, but the conversion from AsyncSequence into a Publisher is also not present and not necessarily trivial.
  • Currently there are no blessed types that would replace CurrentValueSubject and PassThroughSubject from Combine.
  • There are probably more things I forgot now.

The lack of precise error typing shines brights through every single crack of AsyncSequence!

Please don't get me wrong, I appreciate the hard work the teams have done with Swift and async algorithms so far. I'm also not trying to bash on anything here. All I want is to express my experience with these APIs.

  • Is there any plans to allocate more resources in this area soon?
  • What's the experience been for others?

Concurrency continues to be priority 1 according to the focus areas of 2023, so I'm hopeful that some of these deficiencies are addressed in the coming year.

I'll try to offer some help when it comes to two points, though:

Can you explain more about this? The @rethrows mechanism allows you to propagate a throwing and/or non-throwing sequence via protocol composition.

There's a lesser-known initializer, init(unfolding:onCancel:), that lets you erase an async sequence to a stream without the overhead of tasks, etc. Here's a helper initializer that does just that (hat tip @tgrapperon):

extension AsyncStream {
  init<S: AsyncSequence>(_ sequence: S) where S.Element == Element {
    var iterator: S.AsyncIterator?
    self.init {
      if iterator == nil {
        iterator = sequence.makeAsyncIterator()
      return try? await iterator?.next()

At least I have no clue how you'd write an extension on AsyncSequence to erase the current async sequence type to AsyncStream as AsyncSequence has no Failure associated type which I could use to constrain my extension to.

// not possible
extension AsyncSequence where Failure == Never {
  func eraseToAsyncStream() -> AsyncStream<...>

Extensions on AsyncSequence always seem to introduce the Error back unless the current async sequence is captured as a Base and propagated to the new type.

Uff, I didn't know that this closure is called multiple times. The behavior doesn't seem to be documented. My impression was always that it will call once and that's it. Thank you for sharing this example.

Ah yeah, it's not currently possible to propagate through erasure. You need to decide if you want to ignore failure or not at the time of erasing to a stream.

Right, that's one of the problems I have with the current state of the APIs. I wish the failure propagation to be statically instead of ignoring errors silently.

Just remembered one missing thing and added it to list above:

  • Currently there are no blessed types that would replace CurrentValueSubject and PassThroughSubject from Combine.

Thanks for opening this post. I agree that there are some areas that need further work to make AsyncSequences really shine. I reply to some of the points you mentioned with my experience from working on swift-async-algorithms and swift-nio.

I agree that any/some AsyncSequence would be nice for APIs. Right now what we recommend for public APIs is not vending an AsyncStream though. Rather create your own type that conforms to AsyncSequence and back it by an AsyncStream. This allows you to switch implementations later on.

Why would you want to pipe the output of a transformation algorithm into AsyncStream is this because you are vending AsyncStreams in your APIs? If so I recommend the above approach.

Agreed. But that is just the nature of it. Root AsyncSequences are hard to implement since they have to implement proper thread safety. Algorithms are often easier but some of the more complex ones also need to be aware of this, e.g. merge or debounce. In some of my last implementations in swift-async-algorithms I used a state machine based approach that we use in NIO quite often. I can recommend it when you want to implement your own AsyncSequences.


Sounds very interesting, could you link to something specific as an example?

Sure you can find the implementation of the merge state machine here: swift-async-algorithms/MergeStateMachine.swift at main · apple/swift-async-algorithms · GitHub