Hey everyone,
I just got around to updating my open PR in swift-async-algorithms
to add a new MultiProducerSingleConsumerAsyncChannel. This PR is a follow up for my previous Swift evolution proposal SE-0314 Backpressure support for AsyncStream. Below is a copy of the motivation and proposed solution sections of pitch.
MultiProducerSingleConsumerAsyncChannel
- Proposal: SAA-0016
- Authors: Franz Busch
- Status: Implemented
Revision
- 2023/12/18: Migrate proposal from Swift Evolution to Swift Async Algorithms.
- 2023/12/19: Add element size dependent strategy
- 2024/05/19: Rename to multi producer single consumer channel
- 2024/05/28: Add unbounded strategy
- 2025/03/24: Adopt
~Copyable
for correct semantics and better performance.
Introduction
SE-0314 introduced new Async[Throwing]Stream
types which act as root asynchronous sequences. These two types allow bridging from synchronous callbacks such as delegates to an asynchronous sequence. This proposal adds a new root primitive with the goal to model asynchronous multi-producer-single-consumer systems.
Motivation
After using the AsyncSequence
protocol, the Async[Throwing]Stream
types, and the Async[Throwing]Channel
types extensively over the past years, we learned that there is a gap in the ecosystem for a type that provides strict multi-producer-single-consumer guarantees with external backpressure support. Additionally, any stream/channel like type needs to have a clear definition about the following behaviors:
- Backpressure
- Multi/single consumer support
- Downstream consumer termination
- Upstream producer termination
The below sections are providing a detailed explanation of each of those.
Backpressure
In general, backpressure is the mechanism that prevents a fast producer from
overwhelming a slow consumer. It helps stability of the overall system by
regulating the flow of data between different components. Additionally, it
allows to put an upper bound on resource consumption of a system. In reality,
backpressure is used in almost all networked applications.
In Swift, asynchronous sequence also have the concept of internal backpressure. This modeled by the pull-based implementation where a consumer has to call next
on the AsyncIterator
. In this model, there is no way for a consumer to overwhelm a producer since the producer controls the rate of pulling elements.
However, the internal backpressure of an asynchronous isn't the only backpressure in play. There is also the source backpressure that is producing the actual elements. For a backpressured system it is important that every component of such a system is aware of the backpressure of its consumer and its producer.
Let's take a quick look how our current root asynchronous sequences are handling this.
Async[Throwing]Stream
aims to support backpressure by providing a configurable buffer and returning Async[Throwing]Stream.Continuation.YieldResult
which contains the current buffer depth from the yield()
method. However, only providing the current buffer depth on yield()
is not enough to bridge a backpressured system into an asynchronous sequence since this can only be used as a "stop" signal but we are missing a signal to indicate resuming the production. The only viable backpressure strategy that can be implemented with the current API is a timed backoff where we stop producing for some period of time and then speculatively produce again. This is a very inefficient pattern that produces high latencies and inefficient use of resources.
Async[Throwing]Channel
is a multi-producer-multi-consumer channel that only
supports asynchronous producers. Additionally, the backpressure strategy is
fixed by a buffer size of 1 element per producer.
We are currently lacking a type that supports a configurable backpressure
strategy and both asynchronous and synchronous producers.
Multi/single consumer support
The AsyncSequence
protocol itself makes no assumptions about whether the implementation supports multiple consumers or not. This allows the creation of unicast and multicast asynchronous sequences. The difference between a unicast and multicast asynchronous sequence is if they allow multiple iterators to be created. AsyncStream
does support the creation of multiple iterators and it does handle multiple consumers correctly. On the other hand, AsyncThrowingStream
also supports multiple iterators but does fatalError
when more than one iterator has to suspend. The original proposal states:
As with any sequence, iterating over an AsyncStream multiple times, or
creating multiple iterators and iterating over them separately, may produce an
unexpected series of values.
While that statement leaves room for any behavior we learned that a clear distinction of behavior for root asynchronous sequences is beneficial; especially, when it comes to how transformation algorithms are applied on top.
Downstream consumer termination
Downstream consumer termination allows the producer to notify the consumer that no more values are going to be produced. Async[Throwing]Stream
does support this by calling the finish()
or finish(throwing:)
methods of the
Async[Throwing]Stream.Continuation
. However, Async[Throwing]Stream
does not handle the case that the Continuation
may be deinit
ed before one of the finish methods is called. This currently leads to async streams that never terminate.
Upstream producer termination
Upstream producer termination is the inverse of downstream consumer termination where the producer is notified once the consumption has terminated. Currently, Async[Throwing]Stream
does expose the onTermination
property on the Continuation
. The onTermination
closure is invoked once the consumer has terminated. The consumer can terminate in four separate cases:
- The asynchronous sequence was
deinit
ed and no iterator was created - The iterator was
deinit
ed and the asynchronous sequence is unicast - The consuming task is canceled
- The asynchronous sequence returned
nil
or threw
Async[Throwing]Stream
currently invokes onTermination
in all cases; however, since Async[Throwing]Stream
supports multiple consumers (as discussed in the Multi/single consumer support
section), a single consumer task being canceled leads to the termination of all consumers. This is not expected from multicast asynchronous sequences in general.
Proposed solution
The above motivation lays out the expected behaviors for any consumer/producer system and compares them to the behaviors of Async[Throwing]Stream
and Async[Throwing]Channel
.
This section proposes a new type called MultiProducerSingleConsumerAsyncChannel
that implement all of the above mentioned behaviors. Importantly, this proposed solution is taking advantage of ~Copyable
types to model the multi-producer-single-consumer behavior. While the current AsyncSequence
protocols are not supporting ~Copyable
types we provide a way to convert the proposed channel to an asynchronous sequence. This leaves us room to support any potential future asynchronous streaming protocol that supports ~Copyable
.
Creating a MultiProducerSingleConsumerAsyncChannel
You can create an MultiProducerSingleConsumerAsyncChannel
instance using the makeChannel(of: backpressureStrategy:)
method. This method returns you the channel and the source. The source can be used to send new values to the asynchronous channel. The new API specifically provides a multi-producer/single-consumer pattern.
let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel(
of: Int.self,
backpressureStrategy: .watermark(low: 2, high: 4)
)
// The channel and source can be extracted from the returned type
let channel = consume channelAndSource.channel
let source = consume channelAndSource.source
The new proposed APIs offer two different backpressure strategies:
- Watermark: Using a low and high watermark
- Unbounded: Unbounded buffering of the channel. Only use this if the
production is limited through some other mean.
The source is used to send values to the channel. It provides different APIs for
synchronous and asynchronous producers. All of the APIs are relaying the
backpressure of the channel. The synchronous multi-step APIs are the foundation for all other APIs. Below is an example of how it can be used:
do {
let sendResult = try source.send(contentsOf: sequence)
switch sendResult {
case .produceMore:
// Trigger more production in the underlying system
case .enqueueCallback(let callbackToken):
// There are enough values in the channel already. We need to enqueue
// a callback to get notified when we should produce more.
source.enqueueCallback(token: callbackToken, onProduceMore: { result in
switch result {
case .success:
// Trigger more production in the underlying system
case .failure(let error):
// Terminate the underlying producer
}
})
}
} catch {
// `send(contentsOf:)` throws if the channel already terminated
}
The above API offers the most control and highest performance when bridging a synchronous producer to a MultiProducerSingleConsumerAsyncChannel
. First, you have to send values using the send(contentsOf:)
which returns a SendResult
. The result either indicates that more values should be produced or that a callback should be enqueued by calling the enqueueCallback(callbackToken: onProduceMore:)
method. This callback is invoked once the backpressure strategy decided that more values should be produced. This API aims to offer the most flexibility with the greatest performance. The callback only has to be allocated in the case where the producer needs to pause production.
Additionally, the above API is the building block for some higher-level and easier-to-use APIs to send values to the channel. Below is an example of the two higher-level APIs.
// Writing new values and providing a callback when to produce more
try source.send(contentsOf: sequence, onProduceMore: { result in
switch result {
case .success:
// Trigger more production
case .failure(let error):
// Terminate the underlying producer
}
})
// This method suspends until more values should be produced
try await source.send(contentsOf: sequence)
With the above APIs, we should be able to effectively bridge any system into a
MultiProducerSingleConsumerAsyncChannel
regardless if the system is callback-based, blocking, or asynchronous.
Multi producer
To support multiple producers the source offers a copy
method to produce a new source. The source is returned sending
so it is in a disconnected isolation region than the original source allowing to pass it into a different isolation region to concurrently produce elements.
let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel(
of: Int.self,
backpressureStrategy: .watermark(low: 2, high: 5)
)
var channel = consume channelAndSource.channel
var source1 = consume channelAndSource.source
var source2 = source1.copy()
group.addTask {
try await source1.send(1)
}
group.addTask() {
try await source2.send(2)
}
print(await channel.next()) // Prints either 1 or 2 depending on which child task runs first
print(await channel.next()) // Prints either 1 or 2 depending on which child task runs first
Downstream consumer termination
When reading the next two examples around termination behaviour keep in mind
that the newly proposed APIs are providing a strict a single consumer channel.
Calling finish()
terminates the downstream consumer. Below is an example of
this:
// Termination through calling finish
let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel(
of: Int.self,
backpressureStrategy: .watermark(low: 2, high: 4)
)
var channel = consume channelAndSource.channel
var source = consume channelAndSource.source
try await source.send(1)
source.finish()
print(await channel.next()) // Prints Optional(1)
print(await channel.next()) // Prints nil
If the channel has a failure type it can also be finished with an error.
// Termination through calling finish
let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel(
of: Int.self,
throwing: SomeError.self,
backpressureStrategy: .watermark(low: 2, high: 4)
)
var channel = consume channelAndSource.channel
var source = consume channelAndSource.source
try await source.send(1)
source.finish(throwing: SomeError)
print(try await channel.next()) // Prints Optional(1)
print(try await channel.next()) // Throws SomeError
The other way to terminate the consumer is by deiniting the source. This has the same effect as calling finish()
. Since the source is a ~Copyable
type this will happen automatically when the source is last used or explicitly consumed.
// Termination through deiniting the source
let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel(
of: Int.self,
backpressureStrategy: .watermark(low: 2, high: 4)
)
var channel = consume channelAndSource.channel
var source = consume channelAndSource.source
try await source.send(1)
_ = consume source // Explicitly consume the source
print(await channel.next()) // Prints Optional(1)
print(await channel.next()) // Prints nil
Upstream producer termination
The producer will get notified about termination through the onTerminate
callback. Termination of the producer happens in the following scenarios:
// Termination through task cancellation
let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel(
of: Int.self,
backpressureStrategy: .watermark(low: 2, high: 4)
)
var channel = consume channelAndSource.channel
var source = consume channelAndSource.source
source.setOnTerminationCallback { print("Terminated") }
let task = Task {
await channel.next()
}
task.cancel() // Prints Terminated
// Termination through deiniting the channel
let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel(
of: Int.self,
backpressureStrategy: .watermark(low: 2, high: 4)
)
var channel = consume channelAndSource.channel
var source = consume channelAndSource.source
source.setOnTerminationCallback { print("Terminated") }
_ = consume channel // Prints Terminated
// Termination through finishing the source and consuming the last element
let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel(
of: Int.self,
backpressureStrategy: .watermark(low: 2, high: 4)
)
var channel = consume channelAndSource.channel
var source = consume channelAndSource.source
source.setOnTerminationCallback { print("Terminated") }
_ = try await source.send(1)
source.finish()
print(await channel.next()) // Prints Optional(1)
await channel.next() // Prints Terminated
// Termination through deiniting the last source and consuming the last element
let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel(
of: Int.self,
backpressureStrategy: .watermark(low: 2, high: 4)
)
var channel = consume channelAndSource.channel
var source1 = consume channelAndSource.source
var source2 = source1.copy()
source1.setOnTerminationCallback = { print("Terminated") }
_ = try await source1.send(1)
_ = consume source1
_ = try await source2.send(2)
print(await channel.next()) // Prints Optional(1)
print(await channel.next()) // Prints Optional(2)
_ = consume source2
await channel.next() // Prints Terminated
Similar to the downstream consumer termination, trying to send more elements after the producer has been terminated will result in an error thrown from the send methods.
Rest of the proposal
The rest of the proposal including the detailed design, future directions and alternatives considered sections can be found in the PR.
Looking forward to hear what everyone thinks about this!