[question]: (how to) re-introduce concurrent iteration runtime error for `AsyncStream`

the documentation for the AsyncIterators of AsyncStream and AsyncThrowingStream both include the following form of admonition (emphasis mine):

This type doesn’t conform to Sendable. Don’t use it from multiple concurrent contexts. It is a programmer error to invoke next() from a concurrent context that contends with another such call, which results in a call to fatalError().

however, only one of these two types (AsyncThrowingStream) currently implements the alluded-to runtime error if such a state is detected.

this difference appears incidental, and it would benefit developers to have both behavioral consistency across the throwing and non-throwing stream variants, as well as accurate documentation.

the removal of the runtime error appears to have been introduced in this PR, but it is unclear if it was intentional (i would guess not). according to github, this means it first shipped in Swift 5.7.

addressing this would presumably be quite straightforward (just need to revert a single line), but it's unclear to me what the actual process for realizing that is (open a PR? file a radar? evolution proposal?). so i ask – how can such a change be made?

1 Like

@Philippe_Hausler or @FranzBusch, perhaps when you have a moment, you could weigh in with your thoughts on how to move this forward (or reasons not to). thanks in advance!

So this boils down to the Sendability of the type and the enforcement of that. In general runtime fatal errors are often cases where either a) something horribly wrong happens and it is totally unrecoverable but in this case there are legitimate use cases to send an AsyncStream (and the throwing variant too...) from the creation scope to a new concurrency domain. It is worth noting that this was well before the transferring concept. However if the type were to be made Sendable that means that there is a case that had to be addressed; what happens if two tasks concurrently have a hold of the same iteration source?

The story concludes with the fatal error being removed due to that pernicious question of what happens if it is in two tasks; instead the behavior was accepted to consume the values atomically (since it had to support that already).

The lack of symmetry of the two types is unfortunate and we should have consistency (and update the docs). These are outstanding tasks to be resolved.

1 Like

thanks for the response!

this makes sense to me, but what i struggle to conceive of is a compelling reason to support or encourage concurrent iteration. were there specific use cases that motivated this change? IIUC, today AsyncStream is (conditionally) Sendable, but its Iterator is not, which in some sense feels like it implies this should still not be done.

setting aside the inconsistencies in the docs, differences b/w throwing and non-throwing streams and known bugs, as currently implemented, concurrent iteration still seems like it suffers from two significant issues that are likely to be unpleasant surprises to developers:

  1. each consumer will only see an arbitrary subset of the stream's data
  2. cancellation of any Task iterating the stream terminates the entire stream

IMO these behaviors can cause subtle bugs and are worse than simply saying 'this functionality is unsupported' and crashing when it is detected at runtime.

so, regarding the specific proposal question here (re-adding the runtime error) – is there a plausible path to do that, or has that shipped sailed?

1 Like

Producer-consumers.

I suspect the crux of Swift's problems here is that lack of a coherent, comprehensive story for data channels. There are countless use-cases for every logical permutation of functionality (single vs multiple producers, single vs multiple consumers, broadcast vs single recipient). Trying to play favourites just creates problems.

What it really needs isn't piecemeal tweaking of behaviours, but rather a proper replacement (in the stdlib specifically) that covers all these bases. Then it can be made clear to people what the options are, what their differences are, and thus which they should use follows much more easily.

3 Likes

ah yes. to clarify, i didn't mean that in general i don't see motivating use cases for concurrent iteration. i mean that in this specific case AsyncStream was originally written to be effectively 'single-consumer' (or perhaps more accurately, single-consumer at a time), and was later changed to be 'multi-consumer', but not in a way that IMO really functions appropriately. or at least, the behavior is different enough from the various other (rough) analogs in the ecosystem (PassthroughSubject, Signals, etc) that it is a rather sharp edge.

definitely agree that a more holistic story here would be welcome, but perhaps in the interim the best thing to strive for is better documentation.

1 Like

I agree that the current semantics of both AsyncStream variants need to be clarified and at best aligned. In my proposal SE-0406, I outlined some of the things that we should clarify on the type.

Overall we have to be careful though since users are relying on the current behaviors and we shouldn’t break their code. In my opinion, the best step forward is to align the non-throwing and throwing variant of AsyncStream w.r.t. the multi consumption behavior. The only possible alignment here is to go with a multi-producer-multi-consumer behavior but without broadcast. So every element is forwarded to exactly one consumer in FIFO consumer order.

At the same time, I am currently cleaning up my PR in async algorithms where I want to pitch a MultiProducerSingleConsumerChannel. This new type enforces only a single iterator being created. Additionally it exposes the consumers back pressure to the producer. I want to make a few last tweaks before I open a review thread.

5 Likes

As for the existing types we "just" need to land the SE-0406 proposal which will do some of the cleanup requested here.

As for an actual good way forward: fan-out needs to be explicit and behaviors configurable.

We cannot pick one behavior and say it's "the one", as such concrete streams should always be single-subscriber (this is the same idea as with when we designed reactive streams which are the "base" stream/subscription/sequence semantics).

How does that look like in practice? Explicit operators which define the various semantics, like:

  • cancellation:
    • cancel when one cancels
    • cancel when all cancel
    • etc
  • buffering / replay / live:
    • buffer and replay to all subscribers
    • broadcast from current element to all subscribers
    • round robin elements to active subscribers" behaviors
  • when elements are emitted
  • when completion is signaled

In practice this then creates a number of operators, some with "options". In my previous life, working on Akka streams we came up with a bunch of operators like "wireTap" "alsoTo", "alsoToAll" etc. List here: Operators • Akka Documentation

And most importantly, every operator explicitly documents its behavior in a "the same operator has the same table format of information", like this:

I think that we should follow a similar pattern here, and consistently use the same "template" (that we need to come up with) for an operators semantics, and document every single operator, and especially fan-out operators in this tyle.

Of course I don't mean to imply we just use the same template or specific reactive streams semantics, but a general "template" for understanding the behavior of streams and consistently documenting them all is something we could definitely make use of! It'd have to focus on Swift and async/await/AsyncSequence specific wording and semantics etc. I think this would definitely help using and understanding async streams in Swift :slight_smile:

And as we'd perhaps manage to revive the broadcast() proposal (?), that'd be a great one to put this to practice -- along with the SE-0406 work during which we could cleanup some docs and behaviors :slight_smile:

4 Likes

Isn’t this Combine?

Similar, but unless I'm on a different wavelength what we're talking about here is a Structured Concurrency-integrated, imperative equivalent. Combine's one way to do it, but not to everyone's taste (I use it where necessary, for example, because it does have some pretty good features that aren't well replicated elsewhere, but I'm not a fan of its "pipeline of doom" style).

1 Like

That sounds like Combine. Best think of AsyncStream like the inverse of Combine.

It wouldn't break their code if they used AsyncStream correctly, i.e. one consumer. Which surely is going to be the high majority.