According the documentation for AsyncThrowingStream.Continuation/finish(throwing:) the associated stream would not produce any elements anymore after the continuation has been finished:
"Resume the task awaiting the next iteration point by having it return nil, which signifies the end of the iteration."
and
"After calling finish, the stream enters a terminal state and doesn’t produce any additional elements."
However, this is not the observed behaviour:
When creating a tuple (stream, continuation), then populating the continuation with 1 + N elements, and then running an async-for-loop, where – after receiving the first element, the continuation will be terminated via calling finish(throwing:)
, the stream continues to produce all N elements which already reside in the underlying buffer before eventually exiting the loop.
The test below demonstrates the observed behaviour above.
My question is, is this intended behaviour and the documentation could be improved to clarify this, or is this a bug in the implementation of AsyncStream/AsyncThrowingStream.
Test
import Testing
struct Test {
@Test func testAsyncStreamDoesNotReturnElementsAfterContinuationFinished() async throws {
enum Event {
case start, finish, ping
}
enum State {
case start, running, finished
}
enum Error: Swift.Error {
case dropped(Event)
case terminated
case unknown
}
let (stream, continuation) = AsyncThrowingStream<Event, Swift.Error>.makeStream()
continuation.onTermination = { termination in
print(termination)
}
// Populate a few events, before we await the for loop.
// We are not expecting an error here.
try [.start, .finish, .ping, .ping, .ping].forEach { (event: Event) in
let result = continuation.yield(event)
switch result {
case .enqueued:
break
case .dropped(let event):
throw Error.dropped(event)
case .terminated:
throw Error.terminated
default:
throw Error.unknown
}
}
// When receiving element `finish` we finish the continuation.
// So I would expect any element after `finish` (i.e, `ping`s)
// will not be produced.
var state = State.start
var count = 0
loop: for try await event in stream {
switch (event, state) {
case (.start, .start):
state = .running
case (.finish, .running):
state = .finished
continuation.finish(throwing: nil)
// We could break here to exit the for loop, like in the comment below:
// break loop
// However, I expect the loop to exit anyway because of termination
// of the continuation via `continuation.finish(throwing: nil)`,
// according the doc:
// "After calling finish, the stream enters a terminal state
// and doesn’t produce any additional elements."
// So, I do not expect to receive any further elements.
case (_, .finished):
// Uhps!
count += 1
Issue.record("Received an element (\(count)) (aka event: '\(event)') after the continuation has been finished.")
case (.ping, .start):
break
case (.ping, .running):
break
case (.finish, .start):
break
case (.start, .running):
break
}
}
}
}