I was just wondering if anyone could tell me if it is valid in any circumstances to NOT terminate an AsyncSequence if a task is cancelled, but instead to throw a CancellationError, and allow the sequence to continue being iterated from another task?
The documentation for AsyncIteratorProtocol does state that once an AsyncSequence is terminated or an error is thrown, then it should always return nil from next() in the future:
The iterator returns nil to indicate the end of the sequence. After returning nil (or throwing an error) from next(), the iterator enters a terminal state, and all future calls to next() must return nil.
The example I am thinking about looks like this, and on the surface it appears to work well:
class CountingAsyncSequence: AsyncSequence, AsyncIteratorProtocol {
typealias Element = Int
var val = 0
func makeAsyncIterator() -> CountingAsyncSequence { self }
func next() async throws -> Int? {
try await Task.sleep(for: .milliseconds(1)) // checks for cancellation
val += 1
return val
}
}
func consume(sequence: CountingAsyncSequence, for duration: Duration) async throws {
let task = Task {
do {
for try await value in sequence { print(value) }
} catch is CancellationError {
print("Consumer cancelled")
} catch {
print("Some other error \(error)")
throw error
}
}
try await Task.sleep(for: duration)
task.cancel()
_ = try await task.value
}
var seq = CountingAsyncSequence()
Task {
print("--- Consume 1")
try await consume(sequence: seq, for: .milliseconds(50))
print("--- Consume 2")
try await consume(sequence: seq, for: .milliseconds(50))
}
It is quite compelling to allow this behaviour as it feels like the cancellation of a consumer task shouldn't automatically terminate the stream.
My question is: is this a sane thing to do, and is it likely to be outlawed in the future? The only alternative I can think of is return an enum from the stream instead which indiciates cancellation explicitly:
enum StreamValue {
case realValue(Int)
case consumerCancelled
}
... but I don't like this approach as much as it feels unnatural and a pain to have to wrap/unwrap values in this way at the callsite.
There is nothing in the language or protocol right now that actively enforces the behaviour you quoted but it is the expected behaviour. AsyncSequences similar to normal Sequences are expected to continue to return nil after they throw or returned nil. This is a very important invariants that the language relies upon and other algorithms build on top.
I think a multiple consumed sequence isn't quite what I need, as I only ever have a single consumer at any one time - I just need the ability to restart iteration after cancellation.
I was thinking about this a bit more, and it's technically the iterator that must produce nil forever after termination, not the sequence itself. So I think by separating the iterator and the sequence I should get the behaviour I need.
This satisfies the invariant that the existing iterator next() continues to produce nil when finished/errored, while a freshly created iterator allows the sequence to continue being processed. Do you see any issues with this approach (code below)?
actor CountingAsyncSequence: AsyncSequence {
typealias Element = Int
private var val: Int = 0
private func getNext() throws -> Int? {
val += 1
return val
}
nonisolated func makeAsyncIterator() -> AsyncIterator {
AsyncIterator(sequence: self)
}
struct AsyncIterator: AsyncIteratorProtocol {
private var sequence: CountingAsyncSequence
private var finished = false
fileprivate init(sequence: CountingAsyncSequence) {
self.sequence = sequence
}
mutating func next() async throws -> Int? {
if finished {
// Satisfy requirement that on finish iterator always return nil in future
return nil
}
do {
try await Task.sleep(for: .milliseconds(1))
} catch {
finished = true
throw error
}
let value = try await sequence.getNext()
if value == nil {
finished = true
}
return value
}
}
}
func consume(sequence: CountingAsyncSequence, for duration: Duration) async throws {
let task = Task {
do {
for try await value in sequence { print(value) }
} catch is CancellationError {
print("Consumer cancelled")
} catch {
print("Some other error \(error)")
throw error
}
}
try await Task.sleep(for: duration)
task.cancel()
_ = try await task.value
}
var seq = CountingAsyncSequence()
Task {
print("--- Consume 1")
try await consume(sequence: seq, for: .milliseconds(50))
print("--- Consume 2")
try await consume(sequence: seq, for: .milliseconds(50))
}