[Pitch] Enhancing Async{Throwing}Stream

Enhancing Async{Throwing}Stream

Summary of changes

This proposal introduces the following changes:

  1. Typed throws support for AsyncThrowingStream.
  2. Update the unfolding initializer by adopting nonisolated(nonsending) and replacing onCancel’s @Sendable requirement with sending.
  3. Terminate the stream when its continuation is discarded.
  4. Hashable conformance for Async{Throwing}Stream and subtypes.

Motivation

Typed Throws

Thrown errors are type-erased to any Error, requiring additional boilerplate code for preserving the error’s type and integrating it into typed contexts.

let locationStream = AsyncThrowingStream<Location, LocationError> { ... } // Error: Initializer 'init(_:bufferingPolicy:_:)' requires the types 'LocationError' and 'any Error' be equivalent

func processLocations() async throws(LocationError) {
  for try await location in locationStream { // Error: Thrown expression type 'any Error' cannot be converted to error type 'LocationError'
    ...
  }
}

There are two suboptimal workarounds.

  1. Type cast:
let locationStream = AsyncThrowingStream<Location, any Error> { ... }

func processLocations() async throws(LocationError) {
  do {
    for try await location in locationStream {
      ...
    }
  } catch {
    throw error as! LocationError
  }
}
  1. Result type:
let locationStream = AsyncStream<Result<Location, LocationError>> { ... }

func processLocations() async throws(LocationError) {
  for await result in locationStream {
    switch result {
    case .success(let value):
      ...
    case .failure(let locationError):
      throw locationError
    }
  }
}

Unfolding initializer

SE-0314 proposed the following Unfolding initializers:

// AsyncStream
public init(
  unfolding produce: @escaping () async -> Element?, 
  onCancel: (@Sendable () -> Void)? = nil
)

// AsyncThrowingStream
public init(
  unfolding produce: @escaping () async throws -> Element?, 
  onCancel: (@Sendable () -> Void)? = nil
)

However, the onCancel parameter for the AsyncThrowingStream initializer was never implemented, creating a discrepancy between the two variants. Furthermore, SE-0338 changed the execution semantics of nonisolated asynchronous functions by specifying that such functions formally run on the Global Concurrent Executor (GCE), introducing additional and potentially unnecessary actor hops. Additionally, the @Sendable requirement on onCancel is overly strict, as onCancel is never invoked concurrently with itself and at most once.

let nonThrowingStream = AsyncStream {
  ...
} onCancel: {
  ...
}

let throwingStream = AsyncThrowingStream {
  ...
} // No `onCancel` parameter

func process<E>(stream: AsyncStream<E>, on actor: isolated StreamActor) { // starts running on `actor`
  for await element in stream { // Implicit call to `produce`, Hop off `actor`
     actor.process(element) // Hop back to `actor`
  }
}

The process(stream:on:) function is actor-isolated via its explicit isolated parameter. This means its formal isolation is that of the passed-in actor instance. However, the for await-in loop implicitly calls the nonisolated asynchronous produce function-type parameter to receive the next element. As a result, process(stream:on:) continuously hops off and back onto the actor for each iteration.

Continuation and Stream Termination

When discarding the continuation of an active stream, the only alternative to terminate the stream is via task cancellation.

let stream = AsyncStream<Int> { continuation in
  continuation.onTermination = { reason in 
    print(reason)
  }

  for number in 0..<10 {
    continuation.yield(number)
  }
} // continuation discarded here

for await element in stream { // Indefinitely suspended
  print(element) // prints: 0, 1, 2, 3, 4, 5, 6, 7, 8, 9
} 

Unless, the consumer's task is cancelled, the for await-in loop remains indefinitely suspended.

Hashable conformance

Adding Hashable conformance would allow these types to be stored in Hashable conforming types and used in collections like Set and Dictionary. Additionally, the inherited Equatable conformance will allow for direct comparison operations, e.g., for use in testing.

Proposed solution

Typed Throws

AsyncThrowingStream already defines a type parameter Failure: Error. However, until now, Failure has been constrained to any Error. This proposal extends AsyncThrowingStream with new unconstrained initializers and functions, eliminating existing boilerplate code and enabling seamless use in typed contexts.

let locationStream = AsyncThrowingStream<Location, LocationError> { ... }

func processLocations() async throws(LocationError) {
  for try await location in locationStream {
    ...
  }
}

Unfolding Initializer

The missing onCancel parameter will be added to AsyncThrowingStream, making the unfolding initializer API consistent between the throwing and non-throwing variants. Furthermore, this proposal adopts nonisolated(nonsending). As thoroughly explained in SE-0461, this will allow produce to run on the caller’s actor, eliminating unnecessary actor hops. Additionally, @Sendable will be replaced with sending.

let locationStream = Async{Throwing}Stream { // Consistent API
  ...
} onCancel: {
  ...
}

for {try} await location in locationStream { // Executes on the caller's actor
  ...
}

Terminate Stream when Continuation Goes Out of Scope

The continuation-based variant’s implementation will be altered to internally keep track of whether there are any outstanding references to the stream’s continuation, that is, the continuation itself and any copies of it. When the last reference is discarded, the stream is canceled and transitions into a terminal state. If the stream has already been terminated before the last reference is discarded, this will have no effect.

let stream = AsyncStream<Int> { continuation in
  continuation.onTermination = { reason in 
    print(reason)
  }

  for number in 0..<10 {
    continuation.yield(number)
  }
} // continuation discarded here

for await element in stream {
  print(element) // prints: 0, 1, 2, 3, 4, 5, 6, 7, 8, 9
} // `onTermination` invoked with `.cancelled`

The stream is terminated via cancellation after the for-in loop, as the continuation is not retained for further use.

Detailed design

This proposal will add two additional initializers and one additional static makeStream function to AsyncThrowingStream. It is currently not possible to remove the Failure == any Error constraint while maintaining backward compatibility. The @Sendable requirement on onCancel will be replaced with sending, and Hashable conformance will be added to Async{Throwing}Stream and its subtypes.

Updated:

extension AsyncStream {
  init(
    unfolding produce: nonisolated(nonsending) @escaping @Sendable () async -> Element?,
    onCancel: sending (() -> Void)? = nil
  )
}

extension AsyncThrowingStream {
  public init(
    unfolding produce: nonisolated(nonsending) @escaping @Sendable () async throws(Failure) -> Element?,
    onCancel: sending (() -> Void)? = nil
  ) where Failure == any Error
}

New:

extension AsyncThrowingStream {
  public init(
    unfolding produce: nonisolated(nonsending) @escaping @Sendable () async throws(Failure) -> Element?,
    onCancel: sending (() -> Void)? = nil
  )

  public init(
    of elementType: Element.Type = Element.self,
    throwing failureType: Failure.Type = Failure.self,
    bufferingPolicy limit: Continuation.BufferingPolicy = .unbounded,
    _ build: (Continuation) -> Void
  )

  public static func makeStream(
    of elementType: Element.Type = Element.self,
    throwing failureType: Failure.Type = Failure.self,
    bufferingPolicy limit: Continuation.BufferingPolicy = .unbounded
  ) -> (stream: AsyncThrowingStream<Element, Failure>, continuation: AsyncThrowingStream<Element, Failure>.Continuation)
}

Hashable conformance:

// AsyncStream

extension AsyncStream: Hashable {
  public static func == (lhs: Self, rhs: Self) -> Bool {
    return lhs.context === rhs.context
  }

  public func hash(into hasher: inout Hasher) {
    hasher.combine(ObjectIdentifier(self.context))
  }
}

extension AsyncStream.Continuation.BufferingPolicy: Hashable {}

extension AsyncStream.Continuation.YieldResult: Equatable, Hashable where Element: Equatable, Element: Hashable {}

// AsyncThrowingStream

extension AsyncThrowingStream: Hashable {
  public static func == (lhs: Self, rhs: Self) -> Bool {
    return lhs.context === rhs.context
  }

  public func hash(into hasher: inout Hasher) {
    hasher.combine(ObjectIdentifier(self.context))
  }
}

extension AsyncThrowingStream.Continuation.BufferingPolicy: Hashable {}

extension AsyncThrowingStream.Continuation.YieldResult: Equatable, Hashable where Element: Equatable, Element: Hashable {}

extension AsyncThrowingStream.Continuation.Termination: Equatable, Hashable where Failure: Hashable, Failure: Equatable {}

Implications on adoption

Terminating the stream implicitly when the stream’s continuation is discarded would break code that relies on the current behavior, for example to create an indefinite suspension point.

Future directions

~Copyable Support

In principle, it should be possible to support ~Copyable types. But, there are currently several blockers preventing their adoption. The key issue is the lack of support for iterating over a ~Copyable sequence. It is not as simple as declaring {Async}Sequence’s Element associated type as ~Copyable. Changes to the underlying compiler implementation would be required.

However, progress is being made in other areas: Swift Collections now includes unique variants of Deque, and there is ongoing discussion about moving UniqueArray into the standard library. SE-0528 introduced a ~Copyable continuation type.

Further Consideration

Add an Element associated value to the .terminated case

When calling yield(_:) after the stream has been terminated, the yielded value is simply lost. A terminated stream rejects any new values and returns a YieldResult of .terminated. Unlike the .dropped(Element) case, .terminated does not return the rejected value.

While it is possible to save the value by retaining a copy and only discarding it once the value has actually been yielded to the stream, this approach would not work with ~Copyable types. A possible solution would be to modify the .terminated case to include an associated value of type Element.

Provide an onTermination version for the unfolding initializer

In addition to the onCancel unfolding initializer, which is only invoked when the consumer’s task is cancelled, we could offer an onTermination version that would behave like the continuation-based variant.

8 Likes

It seems unfortunate to continue the AsyncStream/AsyncThrowingStream nomenclature into a typed-throws world, since it'll be possible to construct an AsyncThrowingStream<Element, Never> that doesn't actually throw. Ideally AsyncStream<Element, Failure> (or some new basename) would cover both cases.

It doesn't seem like the proposed change to end the stream on continuation discard would be backward-compatible, for example in my test code I have

extension Task where Success == Never, Failure == Never {
    static func sleepUntilCancelled() async {
        let stream = AsyncStream { _ in }
        for await _ in stream {}
    }
}

(to work around the Task.sleep(nanoseconds: .max) bug). It seems like this would not sleep at all after this proposal. Maybe another reason to choose a new basename for the typed-throws variant.

6 Likes

It’s unfortunate. If only we had the ability to define default values for generic type parameters. I also want to add that this pitch is part of a broader effort to reimplement Async{Throwing}Stream, with open PRs that unify the internal implementation. It shouldn’t be too difficult to introduce a new base-name or unify the existing behavior into AsyncStream at some point.

Yes, that’s correct. With the proposed change, the for await-in loop would not suspend at all, and the stream would terminate instantly. I’m reluctant to suggest a new base-name, since, in my opinion, AsyncStream is already the perfect name. However, we could guard the change behind an upcoming feature flag.

1 Like

I am not sure if this change is actually safe. Async[Throwing]Stream supports multiple concurrent iterators so the unfolding closure can be called concurrently, right?

Yes, that's true. My bad, I forgot to clarify that this only applies to onCancel. produce will remain @Sendable. If you look at the detailed design section, you'll see that only onCancel is sending. I updated the pitch.

1 Like

Could you clarify what you mean by this? I was under the assumption that Async[Throwing]Stream does not really support multiple downstream subscribers. This thread seems to support this.

I do understand that it is possible to create multiple “weakly defined” iterators, though. Maybe that is something a review of the existing API could solve?

Async{Throwing}Stream was always a single-consumer API. You can construct a stream either from a continuation closure (complicated history) or from an asynchronous closure. These two variants only share the same public API, and internally they have separate implementations. The async variant, unlike the continuation-based variant, has always allowed weakly defined concurrent iteration and wasn’t @Sendable until Swift 6.0, which allowed for potential data races.

Edit: Here is an example of a race: Godbolt

1 Like

To clarify the position I was taking there; I was speaking retrospectively. As it is today folks do use the consuming nature of the iteration of AsyncStream. That is a feature that is very reasonably used and should still be supported. Hard asserting on that would undoubtedly cause regressions in client code.

1 Like

The Continuation and Stream Termination is very interesting. The current implementation can easily lead to subtle bugs. Adding a new makeAutoTerminatingStream() method could help making sure old implementations keep the existing behavior. Not sure if that’s good or not.

I haven't given it much thought yet, but this would require us to provide even more new initializers in addition to two new static functions. It would also make it difficult to eventually make the proposed change the default. I think putting it behind a feature flag is the most sensible approach. But, I will add the suggestion to the alternatives considered section.

@John_McCall do you think this change would be viable to include? Technically, it is source-breaking.

Probably not as the language currently stands, no.

Thanks for the info!