[Pitch] MultiProducerSingleConsumerAsyncChannel

Hey everyone,

I just got around to updating my open PR in swift-async-algorithms to add a new MultiProducerSingleConsumerAsyncChannel. This PR is a follow up for my previous Swift evolution proposal SE-0314 Backpressure support for AsyncStream. Below is a copy of the motivation and proposed solution sections of pitch.

MultiProducerSingleConsumerAsyncChannel

Revision

  • 2023/12/18: Migrate proposal from Swift Evolution to Swift Async Algorithms.
  • 2023/12/19: Add element size dependent strategy
  • 2024/05/19: Rename to multi producer single consumer channel
  • 2024/05/28: Add unbounded strategy
  • 2025/03/24: Adopt ~Copyable for correct semantics and better performance.

Introduction

SE-0314 introduced new Async[Throwing]Stream types which act as root asynchronous sequences. These two types allow bridging from synchronous callbacks such as delegates to an asynchronous sequence. This proposal adds a new root primitive with the goal to model asynchronous multi-producer-single-consumer systems.

Motivation

After using the AsyncSequence protocol, the Async[Throwing]Stream types, and the Async[Throwing]Channel types extensively over the past years, we learned that there is a gap in the ecosystem for a type that provides strict multi-producer-single-consumer guarantees with external backpressure support. Additionally, any stream/channel like type needs to have a clear definition about the following behaviors:

  1. Backpressure
  2. Multi/single consumer support
  3. Downstream consumer termination
  4. Upstream producer termination

The below sections are providing a detailed explanation of each of those.

Backpressure

In general, backpressure is the mechanism that prevents a fast producer from
overwhelming a slow consumer. It helps stability of the overall system by
regulating the flow of data between different components. Additionally, it
allows to put an upper bound on resource consumption of a system. In reality,
backpressure is used in almost all networked applications.

In Swift, asynchronous sequence also have the concept of internal backpressure. This modeled by the pull-based implementation where a consumer has to call next on the AsyncIterator. In this model, there is no way for a consumer to overwhelm a producer since the producer controls the rate of pulling elements.

However, the internal backpressure of an asynchronous isn't the only backpressure in play. There is also the source backpressure that is producing the actual elements. For a backpressured system it is important that every component of such a system is aware of the backpressure of its consumer and its producer.

Let's take a quick look how our current root asynchronous sequences are handling this.

Async[Throwing]Stream aims to support backpressure by providing a configurable buffer and returning Async[Throwing]Stream.Continuation.YieldResult which contains the current buffer depth from the yield() method. However, only providing the current buffer depth on yield() is not enough to bridge a backpressured system into an asynchronous sequence since this can only be used as a "stop" signal but we are missing a signal to indicate resuming the production. The only viable backpressure strategy that can be implemented with the current API is a timed backoff where we stop producing for some period of time and then speculatively produce again. This is a very inefficient pattern that produces high latencies and inefficient use of resources.

Async[Throwing]Channel is a multi-producer-multi-consumer channel that only
supports asynchronous producers. Additionally, the backpressure strategy is
fixed by a buffer size of 1 element per producer.

We are currently lacking a type that supports a configurable backpressure
strategy and both asynchronous and synchronous producers.

Multi/single consumer support

The AsyncSequence protocol itself makes no assumptions about whether the implementation supports multiple consumers or not. This allows the creation of unicast and multicast asynchronous sequences. The difference between a unicast and multicast asynchronous sequence is if they allow multiple iterators to be created. AsyncStream does support the creation of multiple iterators and it does handle multiple consumers correctly. On the other hand, AsyncThrowingStream also supports multiple iterators but does fatalError when more than one iterator has to suspend. The original proposal states:

As with any sequence, iterating over an AsyncStream multiple times, or
creating multiple iterators and iterating over them separately, may produce an
unexpected series of values.

While that statement leaves room for any behavior we learned that a clear distinction of behavior for root asynchronous sequences is beneficial; especially, when it comes to how transformation algorithms are applied on top.

Downstream consumer termination

Downstream consumer termination allows the producer to notify the consumer that no more values are going to be produced. Async[Throwing]Stream does support this by calling the finish() or finish(throwing:) methods of the
Async[Throwing]Stream.Continuation. However, Async[Throwing]Stream does not handle the case that the Continuation may be deinited before one of the finish methods is called. This currently leads to async streams that never terminate.

Upstream producer termination

Upstream producer termination is the inverse of downstream consumer termination where the producer is notified once the consumption has terminated. Currently, Async[Throwing]Stream does expose the onTermination property on the Continuation. The onTermination closure is invoked once the consumer has terminated. The consumer can terminate in four separate cases:

  1. The asynchronous sequence was deinited and no iterator was created
  2. The iterator was deinited and the asynchronous sequence is unicast
  3. The consuming task is canceled
  4. The asynchronous sequence returned nil or threw

Async[Throwing]Stream currently invokes onTermination in all cases; however, since Async[Throwing]Stream supports multiple consumers (as discussed in the Multi/single consumer support section), a single consumer task being canceled leads to the termination of all consumers. This is not expected from multicast asynchronous sequences in general.

Proposed solution

The above motivation lays out the expected behaviors for any consumer/producer system and compares them to the behaviors of Async[Throwing]Stream and Async[Throwing]Channel.

This section proposes a new type called MultiProducerSingleConsumerAsyncChannel that implement all of the above mentioned behaviors. Importantly, this proposed solution is taking advantage of ~Copyable types to model the multi-producer-single-consumer behavior. While the current AsyncSequence protocols are not supporting ~Copyable types we provide a way to convert the proposed channel to an asynchronous sequence. This leaves us room to support any potential future asynchronous streaming protocol that supports ~Copyable.

Creating a MultiProducerSingleConsumerAsyncChannel

You can create an MultiProducerSingleConsumerAsyncChannel instance using the makeChannel(of: backpressureStrategy:) method. This method returns you the channel and the source. The source can be used to send new values to the asynchronous channel. The new API specifically provides a multi-producer/single-consumer pattern.

let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel(
    of: Int.self,
    backpressureStrategy: .watermark(low: 2, high: 4)
)

// The channel and source can be extracted from the returned type
let channel = consume channelAndSource.channel
let source = consume channelAndSource.source

The new proposed APIs offer two different backpressure strategies:

  • Watermark: Using a low and high watermark
  • Unbounded: Unbounded buffering of the channel. Only use this if the
    production is limited through some other mean.

The source is used to send values to the channel. It provides different APIs for
synchronous and asynchronous producers. All of the APIs are relaying the
backpressure of the channel. The synchronous multi-step APIs are the foundation for all other APIs. Below is an example of how it can be used:

do {
    let sendResult = try source.send(contentsOf: sequence)
    
    switch sendResult {
    case .produceMore:
       // Trigger more production in the underlying system
    
    case .enqueueCallback(let callbackToken):
        // There are enough values in the channel already. We need to enqueue
        // a callback to get notified when we should produce more.
        source.enqueueCallback(token: callbackToken, onProduceMore: { result in
            switch result {
            case .success:
                // Trigger more production in the underlying system
            case .failure(let error):
                // Terminate the underlying producer
            }
        })
    }
} catch {
    // `send(contentsOf:)` throws if the channel already terminated
}

The above API offers the most control and highest performance when bridging a synchronous producer to a MultiProducerSingleConsumerAsyncChannel. First, you have to send values using the send(contentsOf:) which returns a SendResult. The result either indicates that more values should be produced or that a callback should be enqueued by calling the enqueueCallback(callbackToken: onProduceMore:) method. This callback is invoked once the backpressure strategy decided that more values should be produced. This API aims to offer the most flexibility with the greatest performance. The callback only has to be allocated in the case where the producer needs to pause production.

Additionally, the above API is the building block for some higher-level and easier-to-use APIs to send values to the channel. Below is an example of the two higher-level APIs.

// Writing new values and providing a callback when to produce more
try source.send(contentsOf: sequence, onProduceMore: { result in
    switch result {
    case .success:
        // Trigger more production
    case .failure(let error):
        // Terminate the underlying producer
    }
})

// This method suspends until more values should be produced
try await source.send(contentsOf: sequence)

With the above APIs, we should be able to effectively bridge any system into a
MultiProducerSingleConsumerAsyncChannel regardless if the system is callback-based, blocking, or asynchronous.

Multi producer

To support multiple producers the source offers a copy method to produce a new source. The source is returned sending so it is in a disconnected isolation region than the original source allowing to pass it into a different isolation region to concurrently produce elements.

let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel(
    of: Int.self,
    backpressureStrategy: .watermark(low: 2, high: 5)
)
var channel = consume channelAndSource.channel
var source1 = consume channelAndSource.source
var source2 = source1.copy()

group.addTask {
    try await source1.send(1)
}

group.addTask() {
    try await source2.send(2)
}

print(await channel.next()) // Prints either 1 or 2 depending on which child task runs first
print(await channel.next()) // Prints either 1 or 2 depending on which child task runs first

Downstream consumer termination

When reading the next two examples around termination behaviour keep in mind
that the newly proposed APIs are providing a strict a single consumer channel.

Calling finish() terminates the downstream consumer. Below is an example of
this:

// Termination through calling finish
let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel(
    of: Int.self,
    backpressureStrategy: .watermark(low: 2, high: 4)
)
var channel = consume channelAndSource.channel
var source = consume channelAndSource.source

try await source.send(1)
source.finish()

print(await channel.next()) // Prints Optional(1)
print(await channel.next()) // Prints nil

If the channel has a failure type it can also be finished with an error.

// Termination through calling finish
let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel(
    of: Int.self,
    throwing: SomeError.self,
    backpressureStrategy: .watermark(low: 2, high: 4)
)
var channel = consume channelAndSource.channel
var source = consume channelAndSource.source

try await source.send(1)
source.finish(throwing: SomeError)

print(try await channel.next()) // Prints Optional(1)
print(try await channel.next()) // Throws SomeError

The other way to terminate the consumer is by deiniting the source. This has the same effect as calling finish(). Since the source is a ~Copyable type this will happen automatically when the source is last used or explicitly consumed.

// Termination through deiniting the source
let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel(
    of: Int.self,
    backpressureStrategy: .watermark(low: 2, high: 4)
)
var channel = consume channelAndSource.channel
var source = consume channelAndSource.source

try await source.send(1)
_ = consume source // Explicitly consume the source

print(await channel.next()) // Prints Optional(1)
print(await channel.next()) // Prints nil

Upstream producer termination

The producer will get notified about termination through the onTerminate
callback. Termination of the producer happens in the following scenarios:

// Termination through task cancellation
let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel(
    of: Int.self,
    backpressureStrategy: .watermark(low: 2, high: 4)
)
var channel = consume channelAndSource.channel
var source = consume channelAndSource.source
source.setOnTerminationCallback { print("Terminated") }

let task = Task {
    await channel.next()
}
task.cancel() // Prints Terminated
// Termination through deiniting the channel
let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel(
    of: Int.self,
    backpressureStrategy: .watermark(low: 2, high: 4)
)
var channel = consume channelAndSource.channel
var source = consume channelAndSource.source
source.setOnTerminationCallback { print("Terminated") }
_ = consume channel // Prints Terminated
// Termination through finishing the source and consuming the last element
let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel(
    of: Int.self,
    backpressureStrategy: .watermark(low: 2, high: 4)
)
var channel = consume channelAndSource.channel
var source = consume channelAndSource.source
source.setOnTerminationCallback { print("Terminated") }

_ = try await source.send(1)
source.finish()

print(await channel.next()) // Prints Optional(1)
await channel.next() // Prints Terminated
// Termination through deiniting the last source and consuming the last element
let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel(
    of: Int.self,
    backpressureStrategy: .watermark(low: 2, high: 4)
)
var channel = consume channelAndSource.channel
var source1 = consume channelAndSource.source
var source2 = source1.copy()
source1.setOnTerminationCallback = { print("Terminated") }

_ = try await source1.send(1)
_ = consume source1
_ = try await source2.send(2)

print(await channel.next()) // Prints Optional(1)
print(await channel.next()) // Prints Optional(2)
_ = consume source2
await channel.next() // Prints Terminated

Similar to the downstream consumer termination, trying to send more elements after the producer has been terminated will result in an error thrown from the send methods.

Rest of the proposal

The rest of the proposal including the detailed design, future directions and alternatives considered sections can be found in the PR.

Looking forward to hear what everyone thinks about this!

24 Likes

I'm very eager for this! I'd love to see types throws support in this type as well. I think it's only missing when an error is thrown because a send was called during termination. We could easily expose that TerminationError (or whatever its name is) in the signature.

Small sidenote, the proposal in the PR has a formatting issue at:

swift-nio: NIOAsyncSequenceProducer

1 Like

It's great to see more high quality tools for "proper" data flow arrive. The current state around Async[Throwing]Stream frankly feels quite dodgy. The more easy-to-use and correct-by-default building blocks we have at the ready, the better!

I am not a fan of the consume and copy dance, it does not feel swifty...

let channelAndSource = MultiProducerSingleConsumerChannel.makeChannel(
    of: Int.self,
    backpressureStrategy: .watermark(low: 2, high: 4)
)
var channel = consume channelAndSource.channel // meh...
var source1 = consume channelAndSource.source // meh...
var source2 = source1.copy() // meh...

Couldn't we get the same non-copyable/ownership stuff going by something like this:

let channel = MultiProducerSingleConsumerChannel(
    of: Int.self,
    backpressureStrategy: .watermark(low: 2, high: 4)
)
var source1 = channel.makeSource()
var source2 = channel.makeSource()

await channel.next()
_ = consume channel
1 Like

I understand where you are coming from but I think for such an important primitive it is more important to be correct and as performant as possible. ~Copyable types are giving us a great performance edge here over the previous class backed approaches in other AsyncSequences. The consumes that you called out when tearing apart the channelAndSource are only needed right now because tuples don't support ~Copyable types. Otherwise, it might be possible to create a makeChannel method that returns a tuple with those types.

In this short example, this might look like a good approach but in reality the channel is passed to an entirely different system than the source. Since the types are both ~Copyable we must allow the source to create additional sources. Adding this to the Channel will in practice often cause problems since you don't have it in the producing system.

1 Like

Yeah, I thought as much, and I understand what this is trying to achieve. It just looks ugly (objective true facts ; ) and I feel there is a nicer API possible here somehow. But a ~Copyable tuple will help a lot.

This is what feels most out of place for me:

var source2 = source1.copy()

I am not convinced that a "source" should also be a "sourceMaker", if you know what I mean :thinking:.

I think what would help to get a better feel for this is to see the intended API for a "multi-cast" type channel. Clearly, calling .copy() on the receiving bit would be funky. So there will have to be some "thing" that vends the various receiving bits on-demand. I am not saying there necessarily needs to be symmetry, but I don't really see it all clicking together nicely yet.

1 Like

Small typo I guess, in the title and here it is called MultiProducerSingleConsumerAsyncChannel (emphasis Async) and in the implementation it is called MultiProducerSingleConsumerChannel

1 Like

Thanks. Don't know how I missed this. Updated the PR to MultiProducerSingleConsumerAsyncChannel.

2 Likes

This is a great proposal which also helped me grasp the underlying concepts of this entire area a lot better.
The only minor thing is indeed the mentioned source.copy() methods that feel a bit... surprising? In the sense that explicitly copying things in Swift feels weird. On the other hand I do understand the reason, and I also think that it's not unlikely to want to create an additional source further down into the producing code where you cannot access the channel easily anymore.
Perhaps changing the name to clone would already improve this? Hm... :thinking:

I do have something else, minor, in this context, though, but it is not just related to just this specific proposal, @FranzBusch:
In the introducing paragraphs you explain that AsyncSequences allow unicast and multicast implementations and then write that AsyncStream implements creating multiple iterators "correctly". I think it's warranted to explain that "correctly" in this sense means that it compiles and runs without error, but I don't think its behavior is "multicast".
After all, yielded values are not delivered to each consumer, but each value arrives at just one consumer, right? Multicast would basically include copying the values so each consumer gets its own copy for each yield. Which has consequences for ~Copyable elements and sendability, correct? At least that's how I understood the term so far (might be wrong).
I say this because that paragraph appears in several proposals now and googling about multicast streams in swift has them show up eventually.

I’ve been waiting for this since it was first pitched for the standard library. So am really pleased to see this again.

I applaud the desire to indicate consuming and producing capabilities of the AsyncSequence. Too many of the standard AsyncSequences do not make it clear in their documentation their capabilities in this area. I tend to have to search the forums to get this information. But including it as part of the name seems overkill. Are we to expect a MultiProducerMultiConsumerAsyncChannel in the near future? Although that’s AsyncChannel isn’t it?

I thought the main reason for this type was to have an equivalent of AsyncStream but with controllable back pressure. But back pressure isn’t mentioned in the type name.

1 Like

I think either clone or copy are perfectly valid for this method. I went with copy since it matches the auto-synthesized copy method on Copyable types.

I think this is a fair point and multicast is severely under-defined. What I meant in this proposal specifically is an async sequence that allows multiple consumers but doesn't prescribe necessarily how elements are distributed between them. AsyncStream implements a FIFO based multicast. However, there are many more valid multicast patterns such as broadcast (each element is shared with every consumer), watch (similar to broadcast but only the latest element is buffered), or _round_robin (elements are distributed round-robin across the consumers). Now similar to multi-cast even those terms are not set in stone.

Yes. I expect that we want to have more channel types that all offer slightly different behaviour. Currently, I would personally would like to see support for:

  • multi-producer-single-consumer
  • multi-producer-multi-consumer (different variants e.g. broadcast or watch)
  • single/one-shot (one element between a single producer and single consumer)

Each of those types require a slightly different implementation and public API. Trying to cram all of them into the same type results in worse performance and API design challenges.

Initially, I started including back pressure in the type name; however, after having worked on the shape of this type for a long time now I feel that the producer/consumer behavior is way more important than that it supports external back pressure. That's IMO more of a baseline requirement for any type of channel.

2 Likes