SE-0314: AsyncStream and AsyncThrowingStream

What is your evaluation of the proposal?

-1 specifically on two details:

  • The default unlimited buffering behaviour is unideal.
  • Lack of backpressure support.

Both points have been previously brought up in the pitch thread.

The quickest way to illustrate why this is an issue, is perhaps simply by trying to lift some arbitrarily large Collections or Sequences into the async domain. These are not an uncommon use case:

  • (mentioned in the pitch) paginated read from a large file or a remote resource
  • a Int.min ... Int.max range
  • a random number generator
  • a cycled collection
  • maybe just a very large buffer in memory.

Users do often compose these kind of infinite or resource-backed streams, with other streams provided by the system, or defined by their own application logic. The proposal caters for the latter, but does not produce an answer for the former.

The default behaviour of maxBufferedElements: Int = .max, means that yield() will accept them, and append them into the buffer. Since there is no backward indication or mechanism to indicate that the buffer is filling up, iterations are allowed to continue. Eventually, these collections or resources will exhausitvely consumed, until either we run out of memory to allocate for the buffer, or we stuck in an infinite loop long enough to be killed by the OS, whichever comes first.

Specifying a maxBufferedElements in 0 ..< .max does no help either, because now it introduces the drop-oldest-on-full behaviour. It might work depending on the use case, but it is not a general solution.

Another way to summarize these issues is that:

  1. There is no way for a producer to regulate itself based on the rate of consumption, aka. backpressure support.

  2. The unlimited buffering is a divergent from the current mental model of Sequence. If you do:

    let infiniteEnergyGenerator: AnySequence<T>
    
    for i in infiniteEnergyGenerator {
        break
    }
    
    print("Hello world")
    

    Will Swift exhaustively consume infiniteEnergyGenerator? No, it stops consuming it, as soon as the programmer instructs it to do so.

    So if the base language works like this, can we achieve the same with AsyncStream as is? Not either. We are getting stuck in an infinite loop in the initializer of AsyncStream draining the infinite resource.

In short, I am skeptical about accepting the proposal as is — with a unfortunate dangerous default and absence of an important mechanism — into the Swift Standard Library.

Comparing to reactive streams libraries we currently have, AsyncStream is born into a world with suspendable async functions. All the above problems, that have been headaches for us reactive streams library maintainers, can be naturally solved with the liberty of not having to synchronously return to the caller. Let's perhaps seal these problems as part of the history, rather than carrying them into the new Swift async world.

14 Likes