`AsyncStream` constructor which also returns its Continuation

i keep finding myself writing really awkward code when dealing with AsyncStream<T>.Continuations, such as

actor Simulator 
{
}

...

var continuation:AsyncStream<Message>.Continuation?     = nil 
let stream:AsyncStream<Message>                         = .init 
{
    continuation = $0 
}
if let continuation:AsyncStream<Message>.Continuation   = continuation 
{
    await simulator.subscribe(continuation, to: "my channel name")
}
else 
{
    fatalError("unreachable") 
}
return stream 

is there any way we can add an API to AsyncStream which returns the continuation directly, so we do not have to “juggle” it out of the closure?

in general, i also feel like AsyncStream is really difficult to work with due to the requirement that iteration happen in the same Task that the AsyncStream was created in, even when no concurrent iteration ever takes place. this makes it hard to “subscribe” to events generated from actor objects, even when the subscription method is marked nonisolated.

2 Likes

The current api is indeed awkward, especially when the stream is just a way to produce its iterator.

Like @taylorswift, I also have a fatalError that deals with the invalid state made possible by the current api.

I do not know how the api should be enhanced, but I hope the authors of async streams will read this thread, and understand what we are after.

1 Like

I'll add that this pattern looks like a potential race to me:

let stream = AsyncThrowingStream(...) { continuation in
  let token = startSequence(feeding: continuation)
  continuation.onTermination = { @Sendable
    token.endSequence()
  }
}

In the above sample code, the termination is handled by a "token" that needs the continuation to be created.

This is the case of, say, Combine publishers: the "token" is a Cancellable returned by sink, and sink must consume the continuation so that values that are synchronously published, right on publisher subscription, are sent to continuation.yield. Those values are yielded before the cancellable is returned, and thus before onTermination could be set.

I fear that startSequence can put the continuation in a terminated state. In this case, is onTermination called? If it is not, resources are leaked. The precise behavior is not documented. A clarification would be appreciated.

And if this pattern is indeed able to miss a termination notification, what is the recommended (and fixed) pattern?

1 Like

right, it would be better to have

let (session, control):(AsyncStream<T>.Continuation, AsyncStream<T>) = 
    AsyncStream<T>.pipe()
let token:Token = .startSequence(feeding: session)
for await element:T in control 
{
}
token.endSequence()

Hello @ktoso, I'm sorry to disturb, but is it possible to have your feedback on the difficulties and questions we meet (there are several ones listed in this thread), and maybe your advice? When one or two people ask a question on the forums, there are many more silent developers in the wild that wonder as well.

@Philippe_Hausler is the author of this proposal so he might be the best person to ping here. Sorry I can’t make much time outside of my current focus area right now.

4 Likes

So AsyncStream serves some of the purpose of "subject" like things, however I think it is worth discussing other shapes of emissions. I think in the cases where the continuation needs to escape it is sometimes the case where a more subject like type might be a better fit (and perhaps more efficient too).

I think it might be interesting to look at the problem in the perspective of adding additional types to handle this. Just as much as Array is not a one-size-fits all collection and other behaviors such as Set and queues are useful data structures we should consider other asynchronous primitives beyond AsyncStream.

I am very interested in seeing what primitives are the most useful and the community feels are of immediate importance. Especially with regards to composing them into more specialized use cases.

P.S. For the record; AsyncStream and AsyncThrowingStream are designed to allow for the affordance of the continuation escaping, so the cases where it is escaped is actually a supported use (but may not be ideal as you pointed out)

1 Like

I agree, there should not be a fits every need super AsyncSequence. I wonder if there are plans to enhance AsyncSequence to a level where it can replace Combine? Basically an AsyncSequenceand a Publisher are the same thing.

So instead of speaking of replacing anything it might be good to outline the distinct needs since it would limit the discussion.

I think there are a few cases in my mind that are useful to consider generally for AsyncSequence:

There are a few missing parts of Sequence that AsyncSequence does not yet have. For example zip. We avoided this since there is a missing part to async/await that zip needs to accomplish that for asynchronous behavior - trivially one could await the first side and then await the second side... but the problem is that you really want to await both at the same time and compose the result. But if you take that path then if the first resolved value is nil or throws an error you want to immediately either return nil (knowing that the zip is not satisfiable) or throw. That obviously comes to the question of "what happens to the other side?", is it cancelled? (which I am leaning to that as an expected behavior) or does it continue execution?

In that same category as zip we also are missing enumerated. Does this even make sense for AsyncSequence? I can see reasons where it might... but is it worth the API surface?

In a similar vein to zip, what happens if we look at just getting the latest updated value? That is a category of combinators just like zip but instead focuses on the order of resolution of values. I would guess that it should behave similarly to however zip does with regards to satisfaction of the result and cancellation.

I could continue with other ideas but perhaps it might be best to start another thread than hijack this one so we can keep the community thoughts organized.

I agree, this is a huge topic. But I really think it is important to name the elephant in the room in the new thread: Should AsyncSequence replace Combine in the future, or should they coexist?

3 Likes

There is a [Pitch] Convenience Async[Throwing]Stream.makeStream methods for that from @FranzBusch

1 Like