Issue: A Stream will continue to produce elements even when its continuation has been finished

According the documentation for AsyncThrowingStream.Continuation/finish(throwing:) the associated stream would not produce any elements anymore after the continuation has been finished:

"Resume the task awaiting the next iteration point by having it return nil, which signifies the end of the iteration."

and

"After calling finish, the stream enters a terminal state and doesn’t produce any additional elements."

However, this is not the observed behaviour:

When creating a tuple (stream, continuation), then populating the continuation with 1 + N elements, and then running an async-for-loop, where – after receiving the first element, the continuation will be terminated via calling finish(throwing:), the stream continues to produce all N elements which already reside in the underlying buffer before eventually exiting the loop.

The test below demonstrates the observed behaviour above.

My question is, is this intended behaviour and the documentation could be improved to clarify this, or is this a bug in the implementation of AsyncStream/AsyncThrowingStream.

Test
import Testing

struct Test {
    
    @Test func testAsyncStreamDoesNotReturnElementsAfterContinuationFinished() async throws {
        enum Event {
            case start, finish, ping
        }
        enum State {
            case start, running, finished
        }
        
        enum Error: Swift.Error {
            case dropped(Event)
            case terminated
            case unknown
        }

        
        let (stream, continuation) = AsyncThrowingStream<Event, Swift.Error>.makeStream()
        
        continuation.onTermination = { termination in
            print(termination)
        }
        
        // Populate a few events, before we await the for loop.
        // We are not expecting an error here.
        try [.start, .finish, .ping, .ping, .ping].forEach { (event: Event) in
            let result = continuation.yield(event)
            switch result {
            case .enqueued:
                break
            case .dropped(let event):
                throw Error.dropped(event)
            case .terminated:
                throw Error.terminated
            default:
                throw Error.unknown
            }
        }
        
        // When receiving element `finish` we finish the continuation.
        // So I would expect any element after `finish` (i.e, `ping`s)
        // will not be produced.
        var state = State.start
        var count = 0
        loop: for try await event in stream {
            switch (event, state) {
            case (.start, .start):
                state = .running
            case (.finish, .running):
                state = .finished
                continuation.finish(throwing: nil)
                // We could break here to exit the for loop, like in the comment below:
                // break loop
                // However, I expect the loop to exit anyway because of termination
                // of the continuation via `continuation.finish(throwing: nil)`,
                // according the doc:
                // "After calling finish, the stream enters a terminal state
                // and doesn’t produce any additional elements."
                // So, I do not expect to receive any further elements.
            case (_, .finished):
                // Uhps!
                count += 1
                Issue.record("Received an element (\(count)) (aka event: '\(event)') after the continuation has been finished.")
            case (.ping, .start):
                break
            case (.ping, .running):
                break
            case (.finish, .start):
                break
            case (.start, .running):
                break
            }
        }
    }

}

i believe this is how the type is intended to behave, though the documentation could likely be improved to clarify this. personally i've found it helpful to think of the 'continuation half' of the stream as the 'producer' and the 'iterator half' as the 'consumer'. from that perspective, the wording in the second documentation reference may make more sense:

i.e. after it's been finished, the continuation no longer admits new elements. you can observe this in your example by attempting to yield() again after termination, which will not buffer the element and instead returns YieldResult.terminated.

however, the type will not drop elements that have already been buffered unless you explicitly instruct it to[1]. if there are outstanding buffered elements, those must first be 'consumed' before an iterator can 'see' the terminal state via its iteration.

regarding the wording of this piece of documentation on the behavior of the finish() method:

this statement seems somewhat more confusing to me. i think the wording of the documentation is intended to convey that if a Task is suspended awaiting the next stream element and there are no currently-buffered elements, then calling finish() will have the described behavior.

the wording in the corresponding evolution proposal for the type makes this more clear (emphasis mine):

Calling a continuation’s finish() method moves its stream into a “terminated” state. An implementor of an AsyncThrowingStream can optionally pass an error to be thrown by the iterator. After providing all buffered elements, the stream’s iterator will return nil or throw an error, as appropriate.

The first call to finish() sets the terminating behavior of the stream (either returning nil or throwing); further calls to finish() or yield(_:) once a stream is in a terminated state have no effect.

as a slight aside – if you do wish to react to finish() being invoked as soon as possible, then setting an onTermination closure on the stream's continuation should let you do so.


  1. by specifying an appropriate bufferingPolicy in the initializer ↩︎

2 Likes

Thank you for your assessment! :)

Now, reading the evolution proposal and also finding more hints there that support this, I agree that the observed behaviour can be considered correct. Frankly, the documentation could be improved regarding this detail, explaining clearly what happens with already buffered elements.

Now, in my use case, since I can use a break statement to explicitly exit the async-loop (exactly as shown in the test code snippet) I can "work-around" to get the behaviour I want to achieve . But, I have to think about how to deal with those successfully enqueued, but unhandled elements. In my use case (inside a library), it seems safe to ignore them, but it could hide some other issue elsewhere (on the client side).

Also, have you tried running the producer and consumer separately in their own tasks?

Details
func runTest () async throws {
    print ()
    let sp = Test.Stream ()
    
    Task {@TestActor1 in
        try await Test.produce(sp: sp)
    }
    
    Task {@TestActor2 in
        try await Test.consume(sp: sp)
    }
    hibernate (seconds: 5)
}

import Dispatch

func hibernate (seconds s: Double) {
    let sem = DispatchSemaphore (value: 0)
    _ = sem.wait (timeout: .now() + .milliseconds(Int (1000 * s)))
}

@globalActor
actor TestActor1 {
    static let shared = TestActor1 ()
}

@globalActor
actor TestActor2 {
    static let shared = TestActor2 ()
}

struct Test {
    enum Event {
        case start, finish, ping
    }
    enum State {
        case start, running, finished
    }
    
    enum Error: Swift.Error {
        case dropped(Event)
        case terminated
        case unknown
    }

    final class Stream: Sendable {
        let (stream, continuation) = AsyncThrowingStream<Event, Swift.Error>.makeStream()
    }
    

    static func produce (sp: Stream) async throws {
        
        let continuation = sp.continuation
        
        continuation.onTermination = { termination in
            print("onTermination", termination)
        }
        
        // Populate a few events, before we await the for loop.
        // We are not expecting an error here.
        try [.start, .finish, .ping, .ping, .ping].forEach { (event: Event) in
            let result = continuation.yield(event)
            switch result {
            case .enqueued:
                break
            case .dropped(let event):
                throw Error.dropped(event)
            case .terminated:
                throw Error.terminated
            default:
                throw Error.unknown
            }
            // slow down
            hibernate(seconds: 0.001)
            // hibernate(seconds: 0.0001)
        }
    }
    
    static func consume (sp: Stream) async throws {
        let (stream, continuation) = (sp.stream, sp.continuation)
        
        // When receiving element `finish` we finish the continuation.
        // So I would expect any element after `finish` (i.e, `ping`s)
        // will not be produced.
        var state = State.start
        var count = 0
        loop: for try await event in stream {
            switch (event, state) {
            case (.start, .start):
                state = .running
            case (.finish, .running):
                state = .finished
                continuation.finish(throwing: nil)
                // We could break here to exit the for loop, like in the comment below:
                // break loop
                // However, I expect the loop to exit anyway because of termination
                // of the continuation via `continuation.finish(throwing: nil)`,
                // according the doc:
                // "After calling finish, the stream enters a terminal state
                // and doesn’t produce any additional elements."
                // So, I do not expect to receive any further elements.
            case (_, .finished):
                // Uhps!
                count += 1
                print ("Received an element (\(count)) (aka event: '\(event)') after the continuation has been finished.")
            case (.ping, .start):
                break
            case (.ping, .running):
                break
            case (.finish, .start):
                break
            case (.start, .running):
                break
            }
        }
    }
}
// slow down
hibernate(seconds: 0.001)
// hibernate(seconds: 0.0001)

onTermination finished(nil)
// slow down
// hibernate(seconds: 0.001)
hibernate(seconds: 0.0001)

onTermination finished(nil)
Received an element (1) (aka event: 'ping') after the continuation has been finished.
Received an element (2) (aka event: 'ping') after the continuation has been finished.
Received an element (3) (aka event: 'ping') after the continuation has been finished.

1 Like

I only thought about extending the test to run the consumer in its own task. Thank you, you did :)

So, if I understand your test code correctly, you observe that:

Elements that have been successfully enqueued will be consumed, no matter if the continuation has been finished, and the stream's state is in a terminal state.