Proposed change to AsyncStream termination logic

as surfaced in this thread, the current implementation of AsyncStream termination via cancellation is not strictly 'atomic' – the termination handler can run before the stream's internal state has been updated to clear out pending continuations and track termination. this leaves open a window where the cancellation handler can run but new elements can still be yielded into the stream during or briefly after that point. termination due to explicitly calling finish() does not have this same behavior, and it seems more appropriate to me, so i propose we unify the logic.

is there any reason not to do this, or concerns with changing the implementation in this manner? here's a draft of the sort of thing i have in mind.

6 Likes

I think that change makes sense. Before doing that change I was wondering if we could first unify the implementation of AsyncStream and AsyncThrowingStream so that we only have one state machine to maintain.

3 Likes

i can try to take a look into the feasibility of that, though it would introduce more change since the current implementations differ slightly in various ways. IMO we shouldn't necessarily block on that as a prerequisite for 'perfect is the enemy of the good' reasons, though i agree it would make the implementation much nicer.

1 Like

Definitely agree that we shouldn't block and split that out into a separate PR.

4 Likes

We do need to be very careful with changes like this since we have to maintain existing application behavior using that.

Unless I'm misunderstanding, the behavior described cannot be deterministically observed and thus relied upon, yes?

3 Likes

i can come up with a case which would change behavior, but it seems pretty contrived to me. if the termination handler itself wrote values back into the stream, those would now fail to be yielded in the case that the stream was terminated due to task cancellation, whereas they previously would be successfully buffered (assuming there's nothing else racing to end the stream, etc). e.g.

func contrived() async {
    let (ouput, input) = AsyncStream<String>.makeStream()
    input.onTermination = { _ in
        input.yield("terminated")
    }

    input.yield("cancel")

    var results = [String]()
    for await element in ouput {
        results.append(element)

        if element == "cancel" {
            withUnsafeCurrentTask { $0?.cancel() }
        }
    }

    print(results == ["cancel", "terminated"])
    // true before proposed change, false after
}

i assumed Philippe was advising caution with the suggested broader refactor of the stream internals so there isn't so much duplication within the implementation.

1 Like

That reminds me of an issue Combine had (which @Philippe_Hausler may remember our conversation about) where the next major OS release after the initial Combine release changed the cancellation behavior and shut down the streams immediately, which prevented cancellation value delivery, such as automatic cancellation following a failure state. I think the change was made for correctness, so the same may be true here.

That is part of it, the other part is that behavioral changes can be tricky to ship in that some behaviors can be more susceptible to regressions than others, altering order is often a tricky one since it introduces a difference that folks may not expect but furthermore it might be hard to track down what destabilized the build.

For the record; I am not advocating against cleaning up the implementation now that we have better tools like typed throws, nor am I advocating against changing the behavior that might be potentially an issue.

On the contrary I am suggesting that things like tests and what-have-you be added to make those changes more robust to things that might not be immediately obvious. Also I would suggest to factor out the changes in small steps (so that they are identifiable) with clear capabilities such that if we need we can put in a check to determine if we need to fall back to the old behavior etc.

1 Like