Hi Swift Evolution,
Here's a proposal for adding new APIs to Async[Throwing]Stream
to add support for bridging backpressured systems. Proposal PR can be found here .
Introduction
SE-0314 introduced new Async[Throwing]Stream
types which act as root asynchronous
sequences. These two types allow bridging from synchronous callbacks such as
delegates to an asynchronous sequence. This proposal adds a new way of
constructing asynchronous streams with the goal to bridge backpressured systems
into an asynchronous sequence.
Motivation
After using the AsyncSequence
protocol and the Async[Throwing]Stream
types
extensively over the past years, we learned that there are a few important
behavioral details that any AsyncSequence
implementation needs to support.
These behaviors are:
- Backpressure
- Multi/single consumer support
- Downstream consumer termination
- Upstream producer termination
In general, AsyncSequence
implementations can be divided into two kinds: Root
asynchronous sequences that are the source of values such as
Async[Throwing]Stream
and transformational asynchronous sequences such as
AsyncMapSequence
. Most transformational asynchronous sequences implicitly
fulfill the above behaviors since they forward any demand to an underlying
asynchronous sequence that should implement the behaviors. On the other hand,
root asynchronous sequences need to make sure that all of the above behaviors
are correctly implemented. Let's look at the current behavior of
Async[Throwing]Stream
to see if and how it achieves these behaviors.
Backpressure
Root asynchronous sequences need to relay the backpressure to the producing
system. Async[Throwing]Stream
aims to support backpressure by providing a
configurable buffer and returning
Async[Throwing]Stream.Continuation.YieldResult
which contains the current
buffer depth from the yield(:)
method. However, only providing the current
buffer depth on yield(:)
is not enough to bridge a backpressured system into
an asynchronous sequence since this can only be used as a "stop" signal but we
are missing a signal to indicate resuming the production. The only viable
backpressure strategy that can be implemented with the current API is a timed
backoff where we stop producing for some period of time and then speculatively
produce again. This is a very inefficient pattern that produces high latencies
and inefficient use of resources.
Multi/single consumer support
The AsyncSequence
protocol itself makes no assumptions about whether the
implementation supports multiple consumers or not. This allows the creation of
unicast and multicast asynchronous sequences. The difference between a unicast
and multicast asynchronous sequence is if they allow multiple iterators to be
created. AsyncStream
does support the creation of multiple iterators and it
does handle multiple consumers correctly. On the other hand
AsyncThrowingStream
also supports multiple iterators but does fatalError
when more than one iterator has to suspend. The original proposal states:
As with any sequence, iterating over an AsyncStream multiple times, or
creating multiple iterators and iterating over them separately, may produce an
unexpected series of values.
While that statement leaves room for any behavior we learned that a clear distinction
of behavior for root asynchronous sequences is benficial; especially, when it comes to
to how transformation algorithms are applied on top.
Downstream consumer termination
Downstream consumer termination allows the producer to notify the consumer that
no more values are going to be produced. Async[Throwing]Stream
does support
this by calling the finish()
or finish(throwing:)
methods of the
Async[Throwing]Stream.Continuation
. However, Async[Throwing]Stream
does not
handle the case that the Continuation
may be deinit
ed before one of the
finish methods is called. This currently leads to async streams that never
terminate. The behavior could be changed but it could result in semantically
breaking code.
Upstream producer termination
Upstream producer termination is the inverse of downstream consumer termination
where the producer is notified once the consumption has terminated. Currently,
Async[Throwing]Stream
does expose the onTermination
property on the
Continuation
. The onTermination
closure is invoked once the consumer has
terminated. The consumer can terminate in four separate cases:
- The asynchronous sequence was
deinit
ed and no iterator was created - The iterator was
deinit
ed and the asynchronous sequence is unicast - The consuming task is canceled
- The asynchronous sequence returned
nil
or threw
Async[Throwing]Stream
currently invokes onTermination
in all cases; however,
since Async[Throwing]Stream
supports multiple consumers (as discussed in the
Multi/single consumer support
section), a single consumer task being canceled
leads to the termination of all consumers. This is not expected from multicast
asynchronous sequences in general.
Proposed solution
The above motivation lays out the expected behaviors from a root asynchronous
sequence and compares them to the behaviors of Async[Throwing]Stream
. These
are the behaviors where Async[Throwing]Stream
diverges from the expectations.
- Backpressure: Doesn't expose a "resumption" signal to the producer
- Multi/single consumer:
- Divergent implementation between throwing and non-throwing variant
- Supports multiple consumers even though proposal positions it as a unicast
asynchronous sequence
- Consumer termination: Doesn't handle the
Continuation
beingdeinit
ed - Producer termination: Happens on first consumer termination
This section proposes new APIs for Async[Throwing]Stream
that implement all of
the above-mentioned behaviors.
Creating an AsyncStream with backpressure support
You can create an AsyncStream
instance using the new makeStream(of: backPressureStrategy:)
method. This method returns you the stream and the
source. The source can be used to write new values to the asynchronous stream.
The new API specifically provides a multi-producer/single-consumer pattern.
let (stream, source) = AsyncStream.makeStream(
of: Int.self,
backPressureStrategy: .watermark(low: 2, high: 4)
)
The new proposed APIs offer three different ways to bridge a backpressured
system. The foundation is the multi-step synchronous interface. Below is an
example of how it can be used:
do {
let writeResult = try source.write(contentsOf: sequence)
switch writeResult {
case .produceMore:
// Trigger more production
case .enqueueCallback(let callbackToken):
source.enqueueCallback(callbackToken: callbackToken, onProduceMore: { result in
switch result {
case .success:
// Trigger more production
case .failure(let error):
// Terminate the underlying producer
}
})
}
} catch {
// `write(contentsOf)` throws when the asynchronous stream already terminated
}
The above API offers the most control when bridging a synchronous producer to an
asynchronous sequence. First, you have to write values using the
write(contentsOf:)
which returns a WriteResult
. The result either indicates
that more values should be produced or that a callback should be enqueued by
calling the enqueueCallback(callbackToken: onProduceMore:)
method. This callback
is invoked once the backpressure strategy decided that more values should be
produced. This API aims to offer the most flexibility with the greatest
performance. The callback only has to be allocated in the case where the
producer needs to be suspended.
Additionally, the above API is the building block for some higher-level and
easier-to-use APIs to write values to the asynchronous stream. Below is an
example of the two higher-level APIs.
// Writing new values and providing a callback when to produce more
try source.write(contentsOf: sequence, onProduceMore: { result in
switch result {
case .success:
// Trigger more production
case .failure(let error):
// Terminate the underlying producer
}
})
// This method suspends until more values should be produced
try await source.write(contentsOf: sequence)
With the above APIs, we should be able to effectively bridge any system into an
asynchronous stream regardless if the system is callback-based, blocking or
asynchronous.
Detailed design, API/ABI compatibility, future directions, alternatives
Please take a look at the evolution PR which includes the full proposal.
Looking forward to hear what everyone thinks about this!
Franz