Should AsyncStream terminate if its continuation goes out of scope?

as i've had AsyncStream termination on the mind recently, it occurred to me that if a stream's continuation "goes out of scope", it becomes impossible to terminate iteration by any means other than task cancellation (assuming the iteration logic itself doesn't somehow break from the loop). i suppose such cases could be attributed to "user error" but it seems like this situation could be detected and used to induce the stream to end.

i don't think the evolution doc says what should occur in such cases so i'm not sure if this would be a reasonable thing to change outside of the evolution process, but i'd be interested to hear what people think about it regardless. and just to make the scenario concrete, it's something like:

func drop_continuation() async {
  let stream = AsyncStream<Int> { continuation in
    continuation.yield(1)
  }
  // continuation never stored, so cannot be used to finish the stream

  // waits "forever" (or until task is cancelled)
  for await i in stream {
    print(i)
  }
}
// could change the implementation to trigger stream termination if there are
// no extant references to its continuation

can you think of any downsides to adopting such a change?

4 Likes

Sounds good to me. The proposal only mentions that when the stream itself goes out of scope, it gets canceled and its onTermination closure fires. In my opinion, having it detect what is essentially a Never expression fits nicely into the API from a consistency and user-expectation standpoint. What kind of implementation did you have in mind?

i was imagining we'd do something like this or this. i.e. put a private reference type inside the continuation that will trigger termination on deinit. i did a quick test of something along those lines and it seemed like it'd probably work.

could you elaborate on what you mean by this?

I wasn’t very precise here, my bad. What I meant is that an AsyncStream that discards its continuation is similar to a throwing Never function (on second thought, I need to think about it more). I was essentially just looking for a better phrase than “waits forever.”

Gotcha, thanks for pointing me in the right direction. I’m currently trying out different approaches for a possible reimplementation of AsyncStream, and I wasn’t sure what the purpose of this was.

The idea sounds solid. Although, I’m not certain whether we need to turn this into an evolution proposal, since we are somewhat changing its behavior. However, I couldn’t think of a valid use case for the current behavior either.

Thinking about other situations where the stream might be used incorrectly. Currently, you can set BufferingPolicy to one of the bounded cases with an associated value of 0 or less without any diagnostics. I think we should at least warn users about this. What do you think?

seems like a buffer limit of 0 or negative will behave the same, and basically mean all elements will be dropped unless there is a continuation suspended when a value is yielded. that is maybe a little unusual but i don't think it's necessarily a use case we'd want to cut off or imply is somehow 'wrong' (seems to me it's kind of like a PassthroughSubject from Combine). well, i think it makes sense to support passing a limit of zero at least. IMO it's not obvious how a negative value should be treated, but i think that might be more appropriate to handle with documentation rather than diagnostics.

1 Like

I just figured it out myself, I didn’t think about that you can obviously start the consumer first before producing elements, my bad again. I think calling out the behavior more explicitly in the documentation is a good idea, i.e., a “negative” buffering policy behaves no different from a zero-based one, which in turn behaves like a bufferless subject.

1 Like

That would have to track the extent of both the continuation and the stream it self. That would likely add both an extra complication for the implementation and have a slight performance impact memory wise at best.

It is however an interesting concept; Perhaps what would be interesting is developing some sort of ruleset on what things should have this behavior and where that line is drawn.

to clarify – the continuation already effectively tracks the extent of the stream (or, well, the stream plus any extant iterators) since it holds a reference to the internal storage. so IIUC we'd just need the marginal addition of something that tracks the continuation as well.

i think i see where the memory impact would come from (need a new field and probably heap allocation), but is there some other complication you foresee?

curious what you mean by this more concretely.

Your analysis is spot on per the additions - I was thinking that perhaps what we should do before embarking on a change like this is to figure out what the ruleset should be; because there are a number of root AsyncSequence types that exist and it feels like there should perhaps be a suggestion/rule of what folks should expect w.r.t. termination - is it expected that a termination should happen if it can determine if there never could be a value produced? Obviously we wont want a case where it would determine if a value would never be produced would be generally a termination, because there are times that awaiting indefinitely is expected.

There is also a half-step version where we could emit a runtime issue if we detect that the continuation is no longer referenced but the iteration is active and in a suspended state. Because just terminating might just endorse a programmer error.

So in short; I think a brief writeup on what the expectation of generally any AsyncSequence on its expectation of behavior like this would be useful to help guide design for other new AsyncSequences as well as other systems like them.

3 Likes

Could you elaborate on this?

Also, would this be more like a design document (e.g, hosted in Swift Asynchronous Algorithms), or should it become part of general user-facing documentation? I like the idea because I strongly believe that predictability is an important part of concurrency.

If you take the analogy of an UnsafeContinuation then it is like awaiting that continuation with never resuming. That is expected to await indefinitely. Likewise for an AsyncStream if you await the next element of its iterator and never yield a value then it should await indefinitely.

I think documentation is generally a good thing no matter where it is; but I would say that my suggestion is more so of a general documentation thing. After AsyncSequence was introduced we had some extra refinements around the behavior - Revise AsyncSequence proposal to note end-of-iteration behavior (#1312) · swiftlang/swift-evolution@f4045ed · GitHub. In that refinement we claimed that all implementations of AsyncIteratorProtocol MUST return nil after it is terminal either from a thrown error or a nil return termination. I think that we can add perhaps a suggestion outlining the concept of termination SHOULD gracefully terminate when possible if there could never be any value emitted.

We cannot however promise that the termination would ever happen immediately or that it would happen with all implementations because you may easily run across the issue that we don't have any sort of "garbage collection" that marks and sweeps as a background process. So I think the best that could be done is claim it in the case of AsyncStream and AsyncThrowingStream to be a best effort termination that upon the next time that it is awaited (or if it is currently awaiting) that if the continuation has no more internal references then it would be marked as terminal for when the buffer is emptied.

1 Like