[Pitch] MultiProducerSingleConsumerAsyncChannel

Hey everyone,

I just got around to updating my open PR in swift-async-algorithms to add a new MultiProducerSingleConsumerAsyncChannel. This PR is a follow up for my previous Swift evolution proposal SE-0314 Backpressure support for AsyncStream. Below is a copy of the motivation and proposed solution sections of pitch.

MultiProducerSingleConsumerAsyncChannel

Revision

  • 2023/12/18: Migrate proposal from Swift Evolution to Swift Async Algorithms.
  • 2023/12/19: Add element size dependent strategy
  • 2024/05/19: Rename to multi producer single consumer channel
  • 2024/05/28: Add unbounded strategy
  • 2025/03/24: Adopt ~Copyable for correct semantics and better performance.

Introduction

SE-0314 introduced new Async[Throwing]Stream types which act as root asynchronous sequences. These two types allow bridging from synchronous callbacks such as delegates to an asynchronous sequence. This proposal adds a new root primitive with the goal to model asynchronous multi-producer-single-consumer systems.

Motivation

After using the AsyncSequence protocol, the Async[Throwing]Stream types, and the Async[Throwing]Channel types extensively over the past years, we learned that there is a gap in the ecosystem for a type that provides strict multi-producer-single-consumer guarantees with external backpressure support. Additionally, any stream/channel like type needs to have a clear definition about the following behaviors:

  1. Backpressure
  2. Multi/single consumer support
  3. Downstream consumer termination
  4. Upstream producer termination

The below sections are providing a detailed explanation of each of those.

Backpressure

In general, backpressure is the mechanism that prevents a fast producer from
overwhelming a slow consumer. It helps stability of the overall system by
regulating the flow of data between different components. Additionally, it
allows to put an upper bound on resource consumption of a system. In reality,
backpressure is used in almost all networked applications.

In Swift, asynchronous sequence also have the concept of internal backpressure. This modeled by the pull-based implementation where a consumer has to call next on the AsyncIterator. In this model, there is no way for a consumer to overwhelm a producer since the producer controls the rate of pulling elements.

However, the internal backpressure of an asynchronous isn't the only backpressure in play. There is also the source backpressure that is producing the actual elements. For a backpressured system it is important that every component of such a system is aware of the backpressure of its consumer and its producer.

Let's take a quick look how our current root asynchronous sequences are handling this.

Async[Throwing]Stream aims to support backpressure by providing a configurable buffer and returning Async[Throwing]Stream.Continuation.YieldResult which contains the current buffer depth from the yield() method. However, only providing the current buffer depth on yield() is not enough to bridge a backpressured system into an asynchronous sequence since this can only be used as a "stop" signal but we are missing a signal to indicate resuming the production. The only viable backpressure strategy that can be implemented with the current API is a timed backoff where we stop producing for some period of time and then speculatively produce again. This is a very inefficient pattern that produces high latencies and inefficient use of resources.

Async[Throwing]Channel is a multi-producer-multi-consumer channel that only
supports asynchronous producers. Additionally, the backpressure strategy is
fixed by a buffer size of 1 element per producer.

We are currently lacking a type that supports a configurable backpressure
strategy and both asynchronous and synchronous producers.

Multi/single consumer support

The AsyncSequence protocol itself makes no assumptions about whether the implementation supports multiple consumers or not. This allows the creation of unicast and multicast asynchronous sequences. The difference between a unicast and multicast asynchronous sequence is if they allow multiple iterators to be created. AsyncStream does support the creation of multiple iterators and it does handle multiple consumers correctly. On the other hand, AsyncThrowingStream also supports multiple iterators but does fatalError when more than one iterator has to suspend. The original proposal states:

As with any sequence, iterating over an AsyncStream multiple times, or
creating multiple iterators and iterating over them separately, may produce an
unexpected series of values.

While that statement leaves room for any behavior we learned that a clear distinction of behavior for root asynchronous sequences is beneficial; especially, when it comes to how transformation algorithms are applied on top.

Downstream consumer termination

Downstream consumer termination allows the producer to notify the consumer that no more values are going to be produced. Async[Throwing]Stream does support this by calling the finish() or finish(throwing:) methods of the
Async[Throwing]Stream.Continuation. However, Async[Throwing]Stream does not handle the case that the Continuation may be deinited before one of the finish methods is called. This currently leads to async streams that never terminate.

Upstream producer termination

Upstream producer termination is the inverse of downstream consumer termination where the producer is notified once the consumption has terminated. Currently, Async[Throwing]Stream does expose the onTermination property on the Continuation. The onTermination closure is invoked once the consumer has terminated. The consumer can terminate in four separate cases:

  1. The asynchronous sequence was deinited and no iterator was created
  2. The iterator was deinited and the asynchronous sequence is unicast
  3. The consuming task is canceled
  4. The asynchronous sequence returned nil or threw

Async[Throwing]Stream currently invokes onTermination in all cases; however, since Async[Throwing]Stream supports multiple consumers (as discussed in the Multi/single consumer support section), a single consumer task being canceled leads to the termination of all consumers. This is not expected from multicast asynchronous sequences in general.

Proposed solution

The above motivation lays out the expected behaviors for any consumer/producer system and compares them to the behaviors of Async[Throwing]Stream and Async[Throwing]Channel.

This section proposes a new type called MultiProducerSingleConsumerAsyncChannel that implement all of the above mentioned behaviors. Importantly, this proposed solution is taking advantage of ~Copyable types to model the multi-producer-single-consumer behavior. While the current AsyncSequence protocols are not supporting ~Copyable types we provide a way to convert the proposed channel to an asynchronous sequence. This leaves us room to support any potential future asynchronous streaming protocol that supports ~Copyable.

Creating a MultiProducerSingleConsumerAsyncChannel

You can create an MultiProducerSingleConsumerAsyncChannel instance using the makeChannel(of: backpressureStrategy:) method. This method returns you the channel and the source. The source can be used to send new values to the asynchronous channel. The new API specifically provides a multi-producer/single-consumer pattern.

let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel(
    of: Int.self,
    backpressureStrategy: .watermark(low: 2, high: 4)
)

// The channel and source can be extracted from the returned type
let channel = consume channelAndSource.channel
let source = consume channelAndSource.source

The new proposed APIs offer two different backpressure strategies:

  • Watermark: Using a low and high watermark
  • Unbounded: Unbounded buffering of the channel. Only use this if the
    production is limited through some other mean.

The source is used to send values to the channel. It provides different APIs for
synchronous and asynchronous producers. All of the APIs are relaying the
backpressure of the channel. The synchronous multi-step APIs are the foundation for all other APIs. Below is an example of how it can be used:

do {
    let sendResult = try source.send(contentsOf: sequence)
    
    switch sendResult {
    case .produceMore:
       // Trigger more production in the underlying system
    
    case .enqueueCallback(let callbackToken):
        // There are enough values in the channel already. We need to enqueue
        // a callback to get notified when we should produce more.
        source.enqueueCallback(token: callbackToken, onProduceMore: { result in
            switch result {
            case .success:
                // Trigger more production in the underlying system
            case .failure(let error):
                // Terminate the underlying producer
            }
        })
    }
} catch {
    // `send(contentsOf:)` throws if the channel already terminated
}

The above API offers the most control and highest performance when bridging a synchronous producer to a MultiProducerSingleConsumerAsyncChannel. First, you have to send values using the send(contentsOf:) which returns a SendResult. The result either indicates that more values should be produced or that a callback should be enqueued by calling the enqueueCallback(callbackToken: onProduceMore:) method. This callback is invoked once the backpressure strategy decided that more values should be produced. This API aims to offer the most flexibility with the greatest performance. The callback only has to be allocated in the case where the producer needs to pause production.

Additionally, the above API is the building block for some higher-level and easier-to-use APIs to send values to the channel. Below is an example of the two higher-level APIs.

// Writing new values and providing a callback when to produce more
try source.send(contentsOf: sequence, onProduceMore: { result in
    switch result {
    case .success:
        // Trigger more production
    case .failure(let error):
        // Terminate the underlying producer
    }
})

// This method suspends until more values should be produced
try await source.send(contentsOf: sequence)

With the above APIs, we should be able to effectively bridge any system into a
MultiProducerSingleConsumerAsyncChannel regardless if the system is callback-based, blocking, or asynchronous.

Multi producer

To support multiple producers the source offers a copy method to produce a new source. The source is returned sending so it is in a disconnected isolation region than the original source allowing to pass it into a different isolation region to concurrently produce elements.

let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel(
    of: Int.self,
    backpressureStrategy: .watermark(low: 2, high: 5)
)
var channel = consume channelAndSource.channel
var source1 = consume channelAndSource.source
var source2 = source1.copy()

group.addTask {
    try await source1.send(1)
}

group.addTask() {
    try await source2.send(2)
}

print(await channel.next()) // Prints either 1 or 2 depending on which child task runs first
print(await channel.next()) // Prints either 1 or 2 depending on which child task runs first

Downstream consumer termination

When reading the next two examples around termination behaviour keep in mind
that the newly proposed APIs are providing a strict a single consumer channel.

Calling finish() terminates the downstream consumer. Below is an example of
this:

// Termination through calling finish
let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel(
    of: Int.self,
    backpressureStrategy: .watermark(low: 2, high: 4)
)
var channel = consume channelAndSource.channel
var source = consume channelAndSource.source

try await source.send(1)
source.finish()

print(await channel.next()) // Prints Optional(1)
print(await channel.next()) // Prints nil

If the channel has a failure type it can also be finished with an error.

// Termination through calling finish
let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel(
    of: Int.self,
    throwing: SomeError.self,
    backpressureStrategy: .watermark(low: 2, high: 4)
)
var channel = consume channelAndSource.channel
var source = consume channelAndSource.source

try await source.send(1)
source.finish(throwing: SomeError)

print(try await channel.next()) // Prints Optional(1)
print(try await channel.next()) // Throws SomeError

The other way to terminate the consumer is by deiniting the source. This has the same effect as calling finish(). Since the source is a ~Copyable type this will happen automatically when the source is last used or explicitly consumed.

// Termination through deiniting the source
let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel(
    of: Int.self,
    backpressureStrategy: .watermark(low: 2, high: 4)
)
var channel = consume channelAndSource.channel
var source = consume channelAndSource.source

try await source.send(1)
_ = consume source // Explicitly consume the source

print(await channel.next()) // Prints Optional(1)
print(await channel.next()) // Prints nil

Upstream producer termination

The producer will get notified about termination through the onTerminate
callback. Termination of the producer happens in the following scenarios:

// Termination through task cancellation
let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel(
    of: Int.self,
    backpressureStrategy: .watermark(low: 2, high: 4)
)
var channel = consume channelAndSource.channel
var source = consume channelAndSource.source
source.setOnTerminationCallback { print("Terminated") }

let task = Task {
    await channel.next()
}
task.cancel() // Prints Terminated
// Termination through deiniting the channel
let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel(
    of: Int.self,
    backpressureStrategy: .watermark(low: 2, high: 4)
)
var channel = consume channelAndSource.channel
var source = consume channelAndSource.source
source.setOnTerminationCallback { print("Terminated") }
_ = consume channel // Prints Terminated
// Termination through finishing the source and consuming the last element
let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel(
    of: Int.self,
    backpressureStrategy: .watermark(low: 2, high: 4)
)
var channel = consume channelAndSource.channel
var source = consume channelAndSource.source
source.setOnTerminationCallback { print("Terminated") }

_ = try await source.send(1)
source.finish()

print(await channel.next()) // Prints Optional(1)
await channel.next() // Prints Terminated
// Termination through deiniting the last source and consuming the last element
let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel(
    of: Int.self,
    backpressureStrategy: .watermark(low: 2, high: 4)
)
var channel = consume channelAndSource.channel
var source1 = consume channelAndSource.source
var source2 = source1.copy()
source1.setOnTerminationCallback = { print("Terminated") }

_ = try await source1.send(1)
_ = consume source1
_ = try await source2.send(2)

print(await channel.next()) // Prints Optional(1)
print(await channel.next()) // Prints Optional(2)
_ = consume source2
await channel.next() // Prints Terminated

Similar to the downstream consumer termination, trying to send more elements after the producer has been terminated will result in an error thrown from the send methods.

Rest of the proposal

The rest of the proposal including the detailed design, future directions and alternatives considered sections can be found in the PR.

Looking forward to hear what everyone thinks about this!

28 Likes

I'm very eager for this! I'd love to see types throws support in this type as well. I think it's only missing when an error is thrown because a send was called during termination. We could easily expose that TerminationError (or whatever its name is) in the signature.

Small sidenote, the proposal in the PR has a formatting issue at:

swift-nio: NIOAsyncSequenceProducer

2 Likes

It's great to see more high quality tools for "proper" data flow arrive. The current state around Async[Throwing]Stream frankly feels quite dodgy. The more easy-to-use and correct-by-default building blocks we have at the ready, the better!

I am not a fan of the consume and copy dance, it does not feel swifty...

let channelAndSource = MultiProducerSingleConsumerChannel.makeChannel(
    of: Int.self,
    backpressureStrategy: .watermark(low: 2, high: 4)
)
var channel = consume channelAndSource.channel // meh...
var source1 = consume channelAndSource.source // meh...
var source2 = source1.copy() // meh...

Couldn't we get the same non-copyable/ownership stuff going by something like this:

let channel = MultiProducerSingleConsumerChannel(
    of: Int.self,
    backpressureStrategy: .watermark(low: 2, high: 4)
)
var source1 = channel.makeSource()
var source2 = channel.makeSource()

await channel.next()
_ = consume channel
3 Likes

I understand where you are coming from but I think for such an important primitive it is more important to be correct and as performant as possible. ~Copyable types are giving us a great performance edge here over the previous class backed approaches in other AsyncSequences. The consumes that you called out when tearing apart the channelAndSource are only needed right now because tuples don't support ~Copyable types. Otherwise, it might be possible to create a makeChannel method that returns a tuple with those types.

In this short example, this might look like a good approach but in reality the channel is passed to an entirely different system than the source. Since the types are both ~Copyable we must allow the source to create additional sources. Adding this to the Channel will in practice often cause problems since you don't have it in the producing system.

2 Likes

Yeah, I thought as much, and I understand what this is trying to achieve. It just looks ugly (objective true facts ; ) and I feel there is a nicer API possible here somehow. But a ~Copyable tuple will help a lot.

This is what feels most out of place for me:

var source2 = source1.copy()

I am not convinced that a "source" should also be a "sourceMaker", if you know what I mean :thinking:.

I think what would help to get a better feel for this is to see the intended API for a "multi-cast" type channel. Clearly, calling .copy() on the receiving bit would be funky. So there will have to be some "thing" that vends the various receiving bits on-demand. I am not saying there necessarily needs to be symmetry, but I don't really see it all clicking together nicely yet.

2 Likes

Small typo I guess, in the title and here it is called MultiProducerSingleConsumerAsyncChannel (emphasis Async) and in the implementation it is called MultiProducerSingleConsumerChannel

1 Like

Thanks. Don't know how I missed this. Updated the PR to MultiProducerSingleConsumerAsyncChannel.

2 Likes

This is a great proposal which also helped me grasp the underlying concepts of this entire area a lot better.
The only minor thing is indeed the mentioned source.copy() methods that feel a bit... surprising? In the sense that explicitly copying things in Swift feels weird. On the other hand I do understand the reason, and I also think that it's not unlikely to want to create an additional source further down into the producing code where you cannot access the channel easily anymore.
Perhaps changing the name to clone would already improve this? Hm... :thinking:

I do have something else, minor, in this context, though, but it is not just related to just this specific proposal, @FranzBusch:
In the introducing paragraphs you explain that AsyncSequences allow unicast and multicast implementations and then write that AsyncStream implements creating multiple iterators "correctly". I think it's warranted to explain that "correctly" in this sense means that it compiles and runs without error, but I don't think its behavior is "multicast".
After all, yielded values are not delivered to each consumer, but each value arrives at just one consumer, right? Multicast would basically include copying the values so each consumer gets its own copy for each yield. Which has consequences for ~Copyable elements and sendability, correct? At least that's how I understood the term so far (might be wrong).
I say this because that paragraph appears in several proposals now and googling about multicast streams in swift has them show up eventually.

I’ve been waiting for this since it was first pitched for the standard library. So am really pleased to see this again.

I applaud the desire to indicate consuming and producing capabilities of the AsyncSequence. Too many of the standard AsyncSequences do not make it clear in their documentation their capabilities in this area. I tend to have to search the forums to get this information. But including it as part of the name seems overkill. Are we to expect a MultiProducerMultiConsumerAsyncChannel in the near future? Although that’s AsyncChannel isn’t it?

I thought the main reason for this type was to have an equivalent of AsyncStream but with controllable back pressure. But back pressure isn’t mentioned in the type name.

3 Likes

I think either clone or copy are perfectly valid for this method. I went with copy since it matches the auto-synthesized copy method on Copyable types.

I think this is a fair point and multicast is severely under-defined. What I meant in this proposal specifically is an async sequence that allows multiple consumers but doesn't prescribe necessarily how elements are distributed between them. AsyncStream implements a FIFO based multicast. However, there are many more valid multicast patterns such as broadcast (each element is shared with every consumer), watch (similar to broadcast but only the latest element is buffered), or _round_robin (elements are distributed round-robin across the consumers). Now similar to multi-cast even those terms are not set in stone.

Yes. I expect that we want to have more channel types that all offer slightly different behaviour. Currently, I would personally would like to see support for:

  • multi-producer-single-consumer
  • multi-producer-multi-consumer (different variants e.g. broadcast or watch)
  • single/one-shot (one element between a single producer and single consumer)

Each of those types require a slightly different implementation and public API. Trying to cram all of them into the same type results in worse performance and API design challenges.

Initially, I started including back pressure in the type name; however, after having worked on the shape of this type for a long time now I feel that the producer/consumer behavior is way more important than that it supports external back pressure. That's IMO more of a baseline requirement for any type of channel.

2 Likes

Nitpick/typo, I think it should be:

there is no way for a consumer to overwhelm a producer since the CONSUMER controls the rate of pulling elements.

Changed “producer controls the rate of pulling elements” to “CONSUMER controls the rate of pulling elements”.

If you are using AsyncStream then most probably you have an unbounded buffer and unlimited production. This is the default, so this is what most of the people use. As you say: you can inspect the AsyncStream.Continuation.YieldResult looking for any dropped(Element) and introduce a delay if that happens, but the code for that is very complicated.

It also needs to preserve the order of the produced elements which may not be easy if the source is some kind of xxxDelegate (pretty common in Apple frameworks). For example if we want to transform the UserLocationChangedDelegate into an AsyncStream then we have to produce those locations in the same order as we received from the source.

I would say that AsyncStream has no back-pressure solution, apart from the buffer. It is really difficult to handle the AsyncStream.Continuation.YieldResult correctly to simulate other strategies, it is just not the right tool for that.

“per producer” part in “fixed by a buffer size of 1 element PER PRODUCER” is very important, as there is no shared buffer usable by multiple producers. We can improve the throughput by adding AsyncSequence.buffer(policy: AsyncBufferSequencePolicy) that is already present in async algorithms.

What exactly would be the difference between AsyncChannel + AsyncSequence.buffer and the proposed solution? AsyncChannel gives us back-pressure and AsyncSequence.buffer allows us to over-produce little bit.

As I understand the biggest contribution that MultiProducerSingleConsumerAsyncChannel brings is the ability to copy the producer and then finish those producers independently?

A quick summary from the producer side would be:

  • AsyncStream - no back pressure (unless you try really hard)
  • AsyncChannel - send a single value, block until it is consumed
  • AsyncChannel + AsyncSequence.buffer
    • send multiple values or block if the shared buffer is full
    • can send from multiple tasks
  • MultiProducerSingleConsumerAsyncChannel
    • send multiple values or block if the shared buffer is full
    • multiple independent producers that can send from multiple tasks
    • send is both sync and async

In theory you could replicate the behavior of MultiProducerSingleConsumerAsyncChannel by wrapping the AsyncChannel + AsyncSequence.buffer with a type that has a copy method and a counter. If the counter reaches 0 (meaning that all of the child producers are finished) we will call finish on the underling AsyncChannel.

Anyway, I would recommend adding a summary of the differences between all of the communication methods somewhere as some people (including me) may get confused with the plethora of choices.

Btw. the main README in async-algorithms is outdated - it is missing some types.

English is my 2nd language, but I have a small nitpick about the wording. In Poland rivers flow from mountains (upstream - above the sea level) to the sea (downstream - at the sea level), so the “downstream” part is the consumer.

Here “Downstream consumer termination” means that the producer (upstream) cancelled the consumer (downstream), but the wording suggest otherwise.

Anyway, I think that in this case the “downstream” is a full noun/object, it can be either “Downstream termination” or “Consumer termination”. Having both of them suggests the existence of:

  • downstream consumer termination
  • downstream producer termination
  • upstream consumer termination
  • upstream producer termination

I assume that those independent sources can be finished independently:

var source1 = consume channelAndSource.source
var source2 = source1.copy()

source1.finish()

// Can source2 can still be used?
try await source2.send(1)
source2.finish()

I may have missed it, but I do not see the exact sentence for that in the proposal.

No remarks here. Since we have a single consumer then its deinit should finish all of the producers.

If the producer tries to produce after finish then:

do {
  let sendResult = try source.send(contentsOf: sequence)
} catch {
  // `send(contentsOf:)` throws if the channel already terminated
}

For symmetry with AsyncStream.Continuation it should not throw, but instead return enum with 3 cases similar to AsyncStream.Continuation.YieldResult:

enum SyncSendResult {
  case produceMore
  case enqueueCallback
  /// The stream didn’t enqueue the element because the stream was in a terminal state.
  case terminated
}

This is only a pitch (:peach:/:baseball:) and from what I see the sync and async API can be separated into 2 proposals:

  1. async - introduces the type and usage. I don't think there will be any objections.

  2. sync - enhancement with sync API. From what I know this is a new direction not present in other types - sync api with support for back-pressure. It may require longer discussions.

This will also make it easier to read the proposal(s) as they will focus only on 1 thing.

1 Like

Yep, AsyncChannel supports this behavior, but remember that the buffer size is 1.

You can have 10 producers and 5 consumers that work like this:

  • producer produces a value and waits until one of the consumers picks it
  • consumer waits until one of the producers produces an value

I have a similar experience.

Using AsyncStream directly can be a bit messy as it lacks a precise consumer/producer semantic. You can have a whole code base that uses nothing but AsyncStream, but it is very difficult to work with that. Instead, more defined/precise types are often a better choice, as they convey the expectation on how to properly use them.

Also, having a bunch of those types creates a vocabulary that allows people to talk about high level design. Saying AsyncChannel is faster than “2 way communication with await on both sides”.

I'm definitely +1 on naming stuff with more precise semantics like the proposal does.

Other interesting types would be:

  • AsyncSemaphore with 3 methods: acquire/release/with(closure). Similar to github.com/groue/Semaphore. There is also AsyncBoundedSemaphore which does not allow you to go above the initial value.
    Semaphore can sometimes be an anti-pattern when you create 1000s of tasks. But at least you can tell that the code is correct by just looking at it.

  • AsyncBarrier - block until N tasks are waiting on it.

  • AsyncEvent[Broadcast] - an event that occurs 0 or 1 time (similar to Python asyncio.Event). The event is “sticky” - as soon as it occurs any future wait will resolve immediately.

  • AsyncCounter - just a counter with increment/decrement/set. Sooo… common that we may as well have 1 implementation in async-algorithms. There is nothing async about this type, all of the methods are sync. Atomics are way too low level for things like:

    Download 100 images. Abort if 5 downloads fail.

Special mention for AsyncPubSub/AsyncMessageQueue (similar to akka/Broadcast). For years I've been using a simplified version implemented with a separate AsyncStream for every subscriber:

final class AsyncPubSub<Message: Sendable>: Sendable {

  final class Subscription: Sendable, AsyncSequence { // AsyncIteratorProtocol?
    func makeAsyncIterator() -> AsyncIterator
    /// Unsubscribe.
    /// After this no new messages will be received, and the existing iterator will return `nil`.
    func finish()
    deinit { self.finish() }
  }

  init() { }
  deinit { self.finish() }

  /// Send `Message` to all of the `Subscribers`.
  /// Can be `async` for back pressure.
  func send(_ message: Message)
  /// Subscribe to the `Message` stream. Past `Messages` will NOT be replayed.
  func subscribe(/* Options */) -> Subscription
  /// Finishes all of the `Subscribers`. After this `send` becomes noop.
  func finish()
}

The above is not the best, but it works. Would be nice to have a proper version of this.

For example in iOS the View[Controller/Model] sends a message that is received by:

  • main reducer to handle the operation
  • analytics to aggregate the data
  • #if DEBUG -> printf debugger

I think it would be a good exercise to design a bunch of those types. This way the whole library will feel more coherent.

1 Like

i’m really happy this is finally getting some attention, and i have a few suggestions for improving the text of the proposal since, well, evolution proposals have a tendency to become the documentation that everyone relies on, for lack of Actual Documentation.

first, i think it’s really important to repeatedly and annoyingly mention where Async[Throwing]Channel comes from, because AsyncSequence, AsyncStream, etc. live in the standard library, but Async[Throwing]Channel lives in an external package, and this won’t be immediately obvious to someone skimming the proposal from a Google search, unless they’re already experienced users of swift-async-algorithms. i always find it frustrating when i read a JavaScript tutorial, and i find that the example doesn’t work because the tutorial was actually using a JQuery API.

so i suggest adding something in parentheses like (which is defined in the swift-async-algorithms package) after the first occurrence of Async[Throwing]Channel in each paragraph.

i’m not sure what this part means, so i’d love if the next draft included a brief example of when AsyncThrowingStream would and wouldn’t fatalError from multiple iterators.

we should probably add the word “last” so that this reads last Continuation may be deinited. Async[Throwing]Stream is already a multi-producer async sequence, reminding the reader that it’s not an (Async)Channel (a single producer sequence) helps everyone keep their mental model of these APIs straight.

i’m confused by what happens if the (synchronously batched) sequence contains more elements than fit in the channel buffer. are those elements discarded? does the channel overflow its high water mark?

i understand this is an early draft, but keeping in mind that this document will become the reference that everyone uses to remember how to use Swift async channels, we really should clearly state if these APIs actually exist or not. it’s not located under the Future Directions section, but it’s also worded as if these APIs are Speculative and not actually part of the proposal. so this is very unclear to me.

i am okay with copy(). i think that we implicitly copy too much in Swift, and creating a second producer is a meaningful enough event that it really should be explicit. but maybe we could mitigate the superficial aversion to explicit copy() by naming this method fork() or something else that emphasizes that it affects the behavior of the first source, and the consumer.

thanks for all your work on swift-async-algorithms!

4 Likes

Another possible convention here that might help for documentation is to fully qualify the type on first mention:

We use AsyncAlgorithms.Async[Throwing]Channel for our tutorial…
…
And the the Async[Throwing]Channel instance we just constructed can be used for…

2 Likes