Enhancing Async{Throwing}Stream
Summary of changes
This proposal introduces the following changes:
- Typed throws support for
AsyncThrowingStream. - Update the unfolding initializer by adopting
nonisolated(nonsending)and replacingonCancel’s@Sendablerequirement withsending. - Terminate the stream when its continuation is discarded.
Hashableconformance forAsync{Throwing}Streamand subtypes.
Motivation
Typed Throws
Thrown errors are type-erased to any Error, requiring additional boilerplate code for preserving the error’s type and integrating it into typed contexts.
let locationStream = AsyncThrowingStream<Location, LocationError> { ... } // Error: Initializer 'init(_:bufferingPolicy:_:)' requires the types 'LocationError' and 'any Error' be equivalent
func processLocations() async throws(LocationError) {
for try await location in locationStream { // Error: Thrown expression type 'any Error' cannot be converted to error type 'LocationError'
...
}
}
There are two suboptimal workarounds.
- Type cast:
let locationStream = AsyncThrowingStream<Location, any Error> { ... }
func processLocations() async throws(LocationError) {
do {
for try await location in locationStream {
...
}
} catch {
throw error as! LocationError
}
}
- Result type:
let locationStream = AsyncStream<Result<Location, LocationError>> { ... }
func processLocations() async throws(LocationError) {
for await result in locationStream {
switch result {
case .success(let value):
...
case .failure(let locationError):
throw locationError
}
}
}
Unfolding initializer
SE-0314 proposed the following Unfolding initializers:
// AsyncStream
public init(
unfolding produce: @escaping () async -> Element?,
onCancel: (@Sendable () -> Void)? = nil
)
// AsyncThrowingStream
public init(
unfolding produce: @escaping () async throws -> Element?,
onCancel: (@Sendable () -> Void)? = nil
)
However, the onCancel parameter for the AsyncThrowingStream initializer was never implemented, creating a discrepancy between the two variants. Furthermore, SE-0338 changed the execution semantics of nonisolated asynchronous functions by specifying that such functions formally run on the Global Concurrent Executor (GCE), introducing additional and potentially unnecessary actor hops. Additionally, the @Sendable requirement on onCancel is overly strict, as onCancel is never invoked concurrently with itself and at most once.
let nonThrowingStream = AsyncStream {
...
} onCancel: {
...
}
let throwingStream = AsyncThrowingStream {
...
} // No `onCancel` parameter
func process<E>(stream: AsyncStream<E>, on actor: isolated StreamActor) { // starts running on `actor`
for await element in stream { // Implicit call to `produce`, Hop off `actor`
actor.process(element) // Hop back to `actor`
}
}
The process(stream:on:) function is actor-isolated via its explicit isolated parameter. This means its formal isolation is that of the passed-in actor instance. However, the for await-in loop implicitly calls the nonisolated asynchronous produce function-type parameter to receive the next element. As a result, process(stream:on:) continuously hops off and back onto the actor for each iteration.
Continuation and Stream Termination
When discarding the continuation of an active stream, the only alternative to terminate the stream is via task cancellation.
let stream = AsyncStream<Int> { continuation in
continuation.onTermination = { reason in
print(reason)
}
for number in 0..<10 {
continuation.yield(number)
}
} // continuation discarded here
for await element in stream { // Indefinitely suspended
print(element) // prints: 0, 1, 2, 3, 4, 5, 6, 7, 8, 9
}
Unless, the consumer's task is cancelled, the for await-in loop remains indefinitely suspended.
Hashable conformance
Adding Hashable conformance would allow these types to be stored in Hashable conforming types and used in collections like Set and Dictionary. Additionally, the inherited Equatable conformance will allow for direct comparison operations, e.g., for use in testing.
Proposed solution
Typed Throws
AsyncThrowingStream already defines a type parameter Failure: Error. However, until now, Failure has been constrained to any Error. This proposal extends AsyncThrowingStream with new unconstrained initializers and functions, eliminating existing boilerplate code and enabling seamless use in typed contexts.
let locationStream = AsyncThrowingStream<Location, LocationError> { ... }
func processLocations() async throws(LocationError) {
for try await location in locationStream {
...
}
}
Unfolding Initializer
The missing onCancel parameter will be added to AsyncThrowingStream, making the unfolding initializer API consistent between the throwing and non-throwing variants. Furthermore, this proposal adopts nonisolated(nonsending). As thoroughly explained in SE-0461, this will allow produce to run on the caller’s actor, eliminating unnecessary actor hops. Additionally, @Sendable will be replaced with sending.
let locationStream = Async{Throwing}Stream { // Consistent API
...
} onCancel: {
...
}
for {try} await location in locationStream { // Executes on the caller's actor
...
}
Terminate Stream when Continuation Goes Out of Scope
The continuation-based variant’s implementation will be altered to internally keep track of whether there are any outstanding references to the stream’s continuation, that is, the continuation itself and any copies of it. When the last reference is discarded, the stream is canceled and transitions into a terminal state. If the stream has already been terminated before the last reference is discarded, this will have no effect.
let stream = AsyncStream<Int> { continuation in
continuation.onTermination = { reason in
print(reason)
}
for number in 0..<10 {
continuation.yield(number)
}
} // continuation discarded here
for await element in stream {
print(element) // prints: 0, 1, 2, 3, 4, 5, 6, 7, 8, 9
} // `onTermination` invoked with `.cancelled`
The stream is terminated via cancellation after the for-in loop, as the continuation is not retained for further use.
Detailed design
This proposal will add two additional initializers and one additional static makeStream function to AsyncThrowingStream. It is currently not possible to remove the Failure == any Error constraint while maintaining backward compatibility. The @Sendable requirement on onCancel will be replaced with sending, and Hashable conformance will be added to Async{Throwing}Stream and its subtypes.
Updated:
extension AsyncStream {
init(
unfolding produce: nonisolated(nonsending) @escaping @Sendable () async -> Element?,
onCancel: sending (() -> Void)? = nil
)
}
extension AsyncThrowingStream {
public init(
unfolding produce: nonisolated(nonsending) @escaping @Sendable () async throws(Failure) -> Element?,
onCancel: sending (() -> Void)? = nil
) where Failure == any Error
}
New:
extension AsyncThrowingStream {
public init(
unfolding produce: nonisolated(nonsending) @escaping @Sendable () async throws(Failure) -> Element?,
onCancel: sending (() -> Void)? = nil
)
public init(
of elementType: Element.Type = Element.self,
throwing failureType: Failure.Type = Failure.self,
bufferingPolicy limit: Continuation.BufferingPolicy = .unbounded,
_ build: (Continuation) -> Void
)
public static func makeStream(
of elementType: Element.Type = Element.self,
throwing failureType: Failure.Type = Failure.self,
bufferingPolicy limit: Continuation.BufferingPolicy = .unbounded
) -> (stream: AsyncThrowingStream<Element, Failure>, continuation: AsyncThrowingStream<Element, Failure>.Continuation)
}
Hashable conformance:
// AsyncStream
extension AsyncStream: Hashable {
public static func == (lhs: Self, rhs: Self) -> Bool {
return lhs.context === rhs.context
}
public func hash(into hasher: inout Hasher) {
hasher.combine(ObjectIdentifier(self.context))
}
}
extension AsyncStream.Continuation.BufferingPolicy: Hashable {}
extension AsyncStream.Continuation.YieldResult: Equatable, Hashable where Element: Equatable, Element: Hashable {}
// AsyncThrowingStream
extension AsyncThrowingStream: Hashable {
public static func == (lhs: Self, rhs: Self) -> Bool {
return lhs.context === rhs.context
}
public func hash(into hasher: inout Hasher) {
hasher.combine(ObjectIdentifier(self.context))
}
}
extension AsyncThrowingStream.Continuation.BufferingPolicy: Hashable {}
extension AsyncThrowingStream.Continuation.YieldResult: Equatable, Hashable where Element: Equatable, Element: Hashable {}
extension AsyncThrowingStream.Continuation.Termination: Equatable, Hashable where Failure: Hashable, Failure: Equatable {}
Implications on adoption
Terminating the stream implicitly when the stream’s continuation is discarded would break code that relies on the current behavior, for example to create an indefinite suspension point.
Future directions
~Copyable Support
In principle, it should be possible to support ~Copyable types. But, there are currently several blockers preventing their adoption. The key issue is the lack of support for iterating over a ~Copyable sequence. It is not as simple as declaring {Async}Sequence’s Element associated type as ~Copyable. Changes to the underlying compiler implementation would be required.
However, progress is being made in other areas: Swift Collections now includes unique variants of Deque, and there is ongoing discussion about moving UniqueArray into the standard library. SE-0528 introduced a ~Copyable continuation type.
Further Consideration
Add an Element associated value to the .terminated case
When calling yield(_:) after the stream has been terminated, the yielded value is simply lost. A terminated stream rejects any new values and returns a YieldResult of .terminated. Unlike the .dropped(Element) case, .terminated does not return the rejected value.
While it is possible to save the value by retaining a copy and only discarding it once the value has actually been yielded to the stream, this approach would not work with ~Copyable types. A possible solution would be to modify the .terminated case to include an associated value of type Element.
Provide an onTermination version for the unfolding initializer
In addition to the onCancel unfolding initializer, which is only invoked when the consumer’s task is cancelled, we could offer an onTermination version that would behave like the continuation-based variant.