AsyncSequence, break, and cancellation

Thanks @Philippe_Hausler and @Jon_Shier, I think I have a better understanding of async loop. I'm sorry I have used "cancellation" is an ambiguous and misleading fashion.

I now think that the bug (clean up is not performed when a loop is broken and the iterator falls out of reach) happens when the iterator does not perform cleanup when it is deallocated (or some of its lifetime-tracking property is deallocated).

As a support for this claim, I ran the sample code below, which defines two raw AsyncSequences, one with a struct iterator, one with a class iterator. The struct iterator has no way to perform cleanup, because, as @Philippe_Hausler says, breaking a loop is not a cancellation (after all, the iterator may be used later). Only the class iterator can, when it is deallocated.

This reveals a bug in Combine.Publisher.values, which can, and should perform cleanup (cancel the Combine subscription) when its iterator is deallocated.

I also think that AsyncStream/AsyncThrowingStream suffer from the same bug - I'll look for further evidence and follow up.

// Prints:
//
// Test class iterator
// 2021-10-14 06:05:28 +0000
// 2021-10-14 06:05:29 +0000
// 2021-10-14 06:05:30 +0000
// CLEANUP
// LOOP BROKEN
//
// Test struct iterator
// 2021-10-14 06:05:31 +0000
// 2021-10-14 06:05:32 +0000
// 2021-10-14 06:05:33 +0000
// LOOP BROKEN

import Foundation

struct TimerWithClassIterator: AsyncSequence {
    typealias Element = Date
    
    final class AsyncIterator: AsyncIteratorProtocol {
        func next() async throws -> Date? {
            if Task.isCancelled {
                print("CANCELLED")
                return nil
            }
            try await Task.sleep(nanoseconds: 1_000_000_000)
            return Date()
        }

        deinit {
            print("CLEANUP")
        }
    }

    func makeAsyncIterator() -> AsyncIterator {
        return AsyncIterator()
    }
}

struct TimerWithStructIterator: AsyncSequence {
    typealias Element = Date
    
    struct AsyncIterator: AsyncIteratorProtocol {
        func next() async throws -> Date? {
            if Task.isCancelled {
                print("CANCELLED")
                return nil
            }
            try await Task.sleep(nanoseconds: 1_000_000_000)
            return Date()
        }
    }

    func makeAsyncIterator() -> AsyncIterator {
        return AsyncIterator()
    }
}

func brokenLoop<S: AsyncSequence>(_ s: S) async throws {
    var counter = 0
    for try await date in s {
        print(date)
        counter += 1
        if counter == 3 { break }
    }
    print("LOOP BROKEN")
}

@main
struct AsyncApp {
    static func main() async throws {
        print("Test class iterator")
        try await brokenLoop(TimerWithClassIterator())
        
        print("Test struct iterator")
        try await brokenLoop(TimerWithStructIterator())

        // Give time to the runtime
        await Task.sleep(10 * 1_000_000_000)
    }
}