AsyncSequence version of notifications is causing memory leaks

Yes, it is tricky if you would like to cancel the stream automatically. I played a bit and this works but not sure if this is correct.

I cancel the withLifetime's Task inside the loop - continue is required to allow the stream to trigger onTermination . If I use return instead, the stream does not get a chance to receive onTermination so even Task finishes the stream continues forever yielding values. Actually, this is a good learning for me how onTermination works.

I added prints and simply tested how it behaves:

@discardableResult
func withLifetime<Instance, OperationError, Stream>(
    of object: sending Instance,
    consuming stream: Stream,
    forEach operation: sending @escaping (
        _ object: Instance,
        _ element: Stream.Element
    ) throws(OperationError) -> Void
) -> Task<Void, any Error>
where Instance: AnyObject,
      OperationError: Error,
      Stream: AsyncSequence & Sendable,
      Stream.Element: Sendable
{
    var task: Task<Void, any Error>?
    task = Task { [weak object] in
        var isCancelled = false
        for try await element in stream {
            print("withLifetime Task.isCancelled", Task.isCancelled)
            guard !Task.isCancelled,
                  let object
            else {
                print("withLifetime task.cancel")
                task?.cancel()
                continue
            }

            try operation(object, element)
        }
    }
    return task!
}

@MainActor
class Foo {
    var counter = 0

    deinit {
        print("deinit")
    }
}

final class PlaygroundTests: XCTestCase {
    @MainActor
    func test() async throws {
        var instance: Foo! = Foo()

        let stream = AsyncStream { continuation in
            let task = Task {
                var count = 0
                while !Task.isCancelled {
                    try await Task.sleep(for: .seconds(1))
                    count += 1
                    print("yield", count)
                    continuation.yield(count)
                }
            }

            continuation.onTermination = { _ in
                print("onTermination")
                task.cancel()
            }
        }

        let task = withLifetime(of: instance, consuming: stream, forEach: { this, element in
            print("consume", this, element)
        })

        try? await Task.sleep(for: .seconds(2))

        print("nil out")
        instance = nil

        try? await Task.sleep(for: .seconds(5))
        print("end")
    }
}

Output:

yield 1
withLifetime Task.isCancelled false
consume __lldb_expr_360.Foo 1
nil out
deinit
yield 2
withLifetime Task.isCancelled false
withLifetime task.cancel
onTermination
end