Yes, it is tricky if you would like to cancel the stream automatically. I played a bit and this works but not sure if this is correct.
I cancel the withLifetime's Task
inside the loop - continue
is required to allow the stream to trigger onTermination
. If I use return
instead, the stream does not get a chance to receive onTermination
so even Task
finishes the stream continues forever yielding values. Actually, this is a good learning for me how onTermination
works.
I added prints and simply tested how it behaves:
@discardableResult
func withLifetime<Instance, OperationError, Stream>(
of object: sending Instance,
consuming stream: Stream,
forEach operation: sending @escaping (
_ object: Instance,
_ element: Stream.Element
) throws(OperationError) -> Void
) -> Task<Void, any Error>
where Instance: AnyObject,
OperationError: Error,
Stream: AsyncSequence & Sendable,
Stream.Element: Sendable
{
var task: Task<Void, any Error>?
task = Task { [weak object] in
var isCancelled = false
for try await element in stream {
print("withLifetime Task.isCancelled", Task.isCancelled)
guard !Task.isCancelled,
let object
else {
print("withLifetime task.cancel")
task?.cancel()
continue
}
try operation(object, element)
}
}
return task!
}
@MainActor
class Foo {
var counter = 0
deinit {
print("deinit")
}
}
final class PlaygroundTests: XCTestCase {
@MainActor
func test() async throws {
var instance: Foo! = Foo()
let stream = AsyncStream { continuation in
let task = Task {
var count = 0
while !Task.isCancelled {
try await Task.sleep(for: .seconds(1))
count += 1
print("yield", count)
continuation.yield(count)
}
}
continuation.onTermination = { _ in
print("onTermination")
task.cancel()
}
}
let task = withLifetime(of: instance, consuming: stream, forEach: { this, element in
print("consume", this, element)
})
try? await Task.sleep(for: .seconds(2))
print("nil out")
instance = nil
try? await Task.sleep(for: .seconds(5))
print("end")
}
}
Output:
yield 1
withLifetime Task.isCancelled false
consume __lldb_expr_360.Foo 1
nil out
deinit
yield 2
withLifetime Task.isCancelled false
withLifetime task.cancel
onTermination
end