For the sake of future readers, I might offer a variation of @Wouter01 ās answer; perhaps a more idiomatic solution would be an onTermination
clause that only cancels the work if the state
is .cancelled
. For example, consider a sequence that yields 100 values. It might look like:
func integers() -> AsyncStream<Int> {
AsyncStream { continuation in
let task = Task {
for n in 0 ..< 100 {
continuation.yield(n)
try await Task.sleep(for: .seconds(1))
}
continuation.finish()
}
continuation.onTermination = { state in
if case .cancelled = state { task.cancel() }
}
}
}
This will finish
the stream if it finishes yielding values, and will only cancel
the asynchronous work if the sequence, itself, was cancelled. But I personally always check the state
so that I donāt bother cancelling the task if it may have finished on its own, in the course of its natural lifecycle. (And, yes, cancelling a task that has already finished is a NOOP, so checking the state is technically not necessary, but I think this pattern makes the intent more clear.)
I also might be inclined to avoid the name makeStream
to avoid confusion with the AsyncStream
static method of the same name. In this case, I might call the asynchronous sequence of integer values integers
, or something like that.
Now, in your example of a never-ending sequence, this is a distinction without difference, as you might not need to ever finish
, nor the check of the state
of cancelled
in the onTermination
closure (as cancellation is the only way it would ever end), but I offer this rendition for the sake of clarity.
@nonameplum asked:
I will let @FranzBusch answer that question, but he is right, that in general, we are well advised to remain within structured concurrency when we can (so we enjoy automatic cancellation propagation). But in this example, unstructured concurrency is fine (as long as you implement cancellation logic, like above). The typical unstructured concurrency anti-pattern is where one uses Task {ā¦}
(or Task.detached {ā¦}
) without implementing cancellation logic. (And you will see that sort of sloppy implementation when perusing Task {ā¦}
examples online.)
But as long as you do handle the cancellation flow, unstructured concurrency is fine. This is why we have unstructured concurrency, to afford us this sort of fine-grained control. Just remember that when using unstructured concurrency, the burden of handling cancellation rests on your shoulders. But, IMHO, in this example, the unstructured concurrency with explicit cancellation logic is fine. The typical āprefer structured concurrency where you canā advice does not apply in this case.
If you really wanted to use makeStream(of:bufferingPolicy:)
to wrap this asynchronous task, it might look like:
func integers() -> AsyncStream<Int> {
let (stream, continuation) = AsyncStream<Int>.makeStream()
let task = Task {
for n in 0 ..< 100 {
continuation.yield(n)
try await Task.sleep(for: .seconds(1))
}
continuation.finish()
}
continuation.onTermination = { state in
if case .cancelled = state { task.cancel() }
}
return stream
}
The idea would be that you would still use an onTermination
clause to manually stop the asynchronous work when the sequence is cancelled. In this particular case, I do not think this offers much benefit over the AsyncStream(_:bufferingPolicy:_:)
pattern, but I include this for the sake of completeness. Personally, I use this makeStream
pattern when integrating with legacy patterns (completion handlers, delegate methods, etc.), where it is useful to save the AsyncStream.Continuation
in its own property. But, technically, one can use it here, too.
I guess if we are contemplating other patterns, for the sake of completeness, another alternative is to implement your own AsyncSequence
, from scratch. But thatās more complicated and doesnāt offer too much value here, IMHO. There are also other variations where you might use swift-async-algorithms, which has first-class cancellation support, but I think that is also beyond the scope of the question.