AsyncSequence, break, and cancellation

Hello,

I can't figure out how to trigger the cancellation of an async sequence from a break in an async for loop. Am I holding it wrong, or should I report an issue?

Some async sequences have to perform clean up when iteration has stopped. I'm interested in making sure this clean up is performed when I break an async for loop.

In the example below, I'll use an async sequence built from a Combine publisher that logs the cancellation of its subscriptions, because it was the quickest way for me to produce a sample code.

This is my async sequence below. It prints CANCEL when it is cancelled:

import Combine
import Foundation

// Forever produces a Date every second
let dates = Timer
    .publish(every: 1, tolerance: 0, on: .main, in: .default)
    .autoconnect()
    .handleEvents(receiveCancel: { print("CANCEL") })
    .values

Next, I define an async function that loops over this sequence, and breaks after three iterations:

func brokenLoop() async {
    var counter = 0
    for await date in dates {
        print(date)
        counter += 1
        if counter == 3 { break }
    }
    print("END")
}

Finally, I want to see the cancellation in action, so I setup a tiny app:

// First app prints:
// 2021-10-13 16:40:54 +0000
// 2021-10-13 16:40:55 +0000
// 2021-10-13 16:40:56 +0000
// END
@main
struct AsyncApp {
    static func main() async {
        do {
            let task = Task {
                await brokenLoop()
            }
            await task.value
        }
        
        // Give time to the runtime
        await Task.sleep(10 * 1_000_000_000)
    }
}

I can see three dates, and the END word, but never CANCEL :sob:

However, I can see CANCEL when I cancel the task that runs the brokenLoop function:

// Second app prints:
// 2021-10-13 16:40:53 +0000
// CANCEL
// END
@main
struct AsyncApp {
    static func main() async {
        do {
            let task = Task {
                await brokenLoop()
            }
            await Task.sleep(1_500_000_000)
            task.cancel()
        }
        
        // Give time to the runtime
        await Task.sleep(10 * 1_000_000_000)
    }
}

Because I can see CANCEL when the parent task is cancelled in the second app, I believe the async sequence correctly handles the cancellation flag.

Because I can not see CANCEL when the loop is broken and the parent task proceeds to completion, in the first app, I start having doubts about how break is handled.

When I ask Xcode for a memory snapshot, my lack of experience prevents me from understanding who is retaining the subscription (see screenshot). I can see something that looks like attached to an async iterator, but I don't understand who keeps the strong reference on it (and shouldn't):

Do you have an idea about what's wrong?

3 Likes

The full reproducing code, for the record:

code
import Combine
import Foundation

// Forever produces a Date every second
let dates = Timer
    .publish(every: 1, tolerance: 0, on: .main, in: .default)
    .autoconnect()
    .handleEvents(receiveCancel: { print("CANCEL") })
    .values

func brokenLoop() async {
    var counter = 0
    for await date in dates {
        print(date)
        counter += 1
        if counter == 3 { break }
    }
    print("END")
}

@main
struct AsyncApp {
    static func main() async {
        // OK: Publisher subscription is cancelled
        do {
            let task = Task {
                await brokenLoop()
            }
            await Task.sleep(1_500_000_000)
            task.cancel()
        }
        
        // NOT OK: Publisher subscription is never cancelled
        do {
            let task = Task {
                await brokenLoop()
            }
            await task.value
        }
        
        // Give time to the runtime
        await Task.sleep(10 * 1_000_000_000)
    }
}

Please file a feedback on this, there may be something happening here out of your control. it would be useful when filing that to attach a working sample of it and the memory graph.

Thank you @Philippe_Hausler: FB9700937 (break in async for loop does not cancel an async sequence)

Please tell me if more information is needed.

It hasn't yet percolated through the system yet, but I was able to reproduce it and it looks like it is definitely not you "holding it wrong" but instead it is a bug.

4 Likes

That's good news! This mean I can focus on something else for a while :wink:

1 Like

I was under the impression that breaking an async loop was not considered a cancellation but a normal finish?

Normal finish still needs to tell the iterator that it's time to stop iteration, and perform the final clean up. AFAIK, only the cancellation flag can send this message. It is lacking when the loop stops with break. Not only the iterator does not stop and perform clean up: it leaks.

I think I'm confusing myself between Task, AsyncSequence, and AsyncStream behaviors.

You're not alone ;-) In my sample code a Task is introduced as a way to wrap the for loop in a unit that can finish, or be cancelled, and exhibit the discussed behaviors.

To clarify here, breaking alone is NOT a source of cancellation - one could easily resume iteration past the break. However at the scope of deinitialization boundary the values are no longer used and so there is no longer any reference binding the potential execution of that publisher and so therefore it should be canceled. Long story short, in the example given it should hit a cancel but perhaps not from the reason you might fully have expected.

1 Like

Thanks @Philippe_Hausler and @Jon_Shier, I think I have a better understanding of async loop. I'm sorry I have used "cancellation" is an ambiguous and misleading fashion.

I now think that the bug (clean up is not performed when a loop is broken and the iterator falls out of reach) happens when the iterator does not perform cleanup when it is deallocated (or some of its lifetime-tracking property is deallocated).

As a support for this claim, I ran the sample code below, which defines two raw AsyncSequences, one with a struct iterator, one with a class iterator. The struct iterator has no way to perform cleanup, because, as @Philippe_Hausler says, breaking a loop is not a cancellation (after all, the iterator may be used later). Only the class iterator can, when it is deallocated.

This reveals a bug in Combine.Publisher.values, which can, and should perform cleanup (cancel the Combine subscription) when its iterator is deallocated.

I also think that AsyncStream/AsyncThrowingStream suffer from the same bug - I'll look for further evidence and follow up.

// Prints:
//
// Test class iterator
// 2021-10-14 06:05:28 +0000
// 2021-10-14 06:05:29 +0000
// 2021-10-14 06:05:30 +0000
// CLEANUP
// LOOP BROKEN
//
// Test struct iterator
// 2021-10-14 06:05:31 +0000
// 2021-10-14 06:05:32 +0000
// 2021-10-14 06:05:33 +0000
// LOOP BROKEN

import Foundation

struct TimerWithClassIterator: AsyncSequence {
    typealias Element = Date
    
    final class AsyncIterator: AsyncIteratorProtocol {
        func next() async throws -> Date? {
            if Task.isCancelled {
                print("CANCELLED")
                return nil
            }
            try await Task.sleep(nanoseconds: 1_000_000_000)
            return Date()
        }

        deinit {
            print("CLEANUP")
        }
    }

    func makeAsyncIterator() -> AsyncIterator {
        return AsyncIterator()
    }
}

struct TimerWithStructIterator: AsyncSequence {
    typealias Element = Date
    
    struct AsyncIterator: AsyncIteratorProtocol {
        func next() async throws -> Date? {
            if Task.isCancelled {
                print("CANCELLED")
                return nil
            }
            try await Task.sleep(nanoseconds: 1_000_000_000)
            return Date()
        }
    }

    func makeAsyncIterator() -> AsyncIterator {
        return AsyncIterator()
    }
}

func brokenLoop<S: AsyncSequence>(_ s: S) async throws {
    var counter = 0
    for try await date in s {
        print(date)
        counter += 1
        if counter == 3 { break }
    }
    print("LOOP BROKEN")
}

@main
struct AsyncApp {
    static func main() async throws {
        print("Test class iterator")
        try await brokenLoop(TimerWithClassIterator())
        
        print("Test struct iterator")
        try await brokenLoop(TimerWithStructIterator())

        // Give time to the runtime
        await Task.sleep(10 * 1_000_000_000)
    }
}

I don’t understand. I believe the iterator is destroyed after break. How can you resume a non-existent iterator?

I’m not an expert in the new concurrency feature, but in Combine deinit doesn’t necessarily trigger custom cancellation.

In case of a for loop, yes, but not when you hold a reference to the iterator:

var iter = sequence.makeIterator()
while let value = await iter.next() {
  ...
  break // eventually
}

let oneMoreThing = await iter.next()

In a for loop, the iterator is out of reach, and thus can not be resumed. So I interpret Philippe's answer in the light of my mistake to put the focus on break: a break, generally speaking, is not enough to invalidate an iterator.

I have let many of my own confusions enter this thread, and I'm really sorry.

I’m not an expert in the new concurrency feature, but in Combine deinit doesn’t necessarily trigger custom cancellation.

Yeah, I'll try to clarify my expectations and vocabulary about this.

2 Likes

As an exercice, I have made several attempts at writing my own async sequence that wraps a Combine publisher.

When my async iterator subscribes with sink and stores the returned cancellable, I see no retain cycle, "breaking" an async loop deinits the iterator, and thus cancels the Combine subscription (this is the goal).

A similar variant is to instantiate a custom subscriber when the iterator is created, and have the iterator ask this subscriber to cancel the subscription when in iterator.deinit. That subscriber holds a weak reference to the iterator in order to avoid retain cycle. Again this setup gives the expected results.

However, as soon as the Iterator is also the Subscriber, then I witness a retain cycle that I can not break. The iterator, as a subscriber, is retained by the Combine subscription. As a consequence, it is not deallocated when async/await for loop drops its last reference to it, its deinit is not called, and the iterator can not trigger the subscription cancellation.

I don’t think so. He explicitly refer to your example, which is break in for loop.

It’s probably safe to interpret every break in this thread as “break in for loop”. I don’t think anyone expect break in while loop to perform an arbitrary method on a random variable.

So here is my conclusion: [Edit: I was wrong]

1. @gwendal.roue and I think ``` for await value in seq { break } ``` should translate into ``` var iter = seq.makeAsyncIterator() while let value = await iter.next() { iter.cancel() // compiler insert cancel before break break } ``` 2. In reality `iter.cancel()` is not get called at all. 3. @Philippe_Hausler @gwendal.roue and I all agree that `iter.cancel()` should get called somewhere. 4. I’m under the impression that @Philippe_Hausler think async loop should translate into ``` var iter = seq.makeAsyncIterator() while let value = await iter.next() { break } iter.deinit() // implied iter.cancel() here ```

Am I understand correctly?

Not quite, but don't take this as a critic :sweat_smile:

AsyncIterator has no cancel method, so iter.cancel() is not possible.

The Swift async runtime can communicate with the iterator in two ways only:

  1. via task cancellation
  2. via iterator falling out of scope when the Swift runtime no longer needs it (I mean that some object is no longer retained, and it can catch this event in its deinit)

That's the only possibilities, given the available apis.

On top of that, breaking a loop is not a "cancellation", as meant by Swift async/await:

// No task is cancelled here
Task {
    for await let value in sequence { ... /* break */ ... }
}
await Task.value

When a loop is interrupted early (via break or any other mean), the only way for the iterator to understand that its should perform cleanup is via some deinit.

The Swift runtime correctly releases memory (AsyncSequence, break, and cancellation - #12 by gwendal.roue).

This means that well-behaved iterators can clean up resources when a loop is interrupted.

This is not the case of the iterator created by Combine.Publisher.values. It definitely has a bug.

And I also think that Async(Throwing)Stream suffers from a similar problem, but I could not yet figure out a clear sample code.

2 Likes

Ok that’s embarrassing :sweat_smile:. Now I see your example as expected behavior. In my experience of reimplementing Combine, deinit and cancel are completely unrelated. They both release resources, but deinit is not build upon cancel. Subscribers like Sink and Assign doesn’t manually cancel its Subscription on deinit. Deinit of the Subscription is sufficient to releasing all resources.

Your instincts on the implementation of Combine are accurate - most things do not cancel in deinit because that can only be reached by terminal or explicit cancellation. The rub here is that if we hold the reactive specification as a nearly isomorphic rule set to Combine the rule 2.6 says "A Subscriber MUST call Subscription.cancel() if the Subscription is no longer needed." That infers in the case of the accessor for AsyncPublisher (the Publisher.values property) should relinquish the subscription via cancel in deinit.

Combine and AsyncSequence both loosely adhere to that specification (mainly because it is a pretty darned well thought out set of rules). There are a few exceptions to that, but in this case it is a violation.

1 Like

For the historical record, this was actually one of the first iterations of implementing AsyncSequence - where it synthetically generated a branch for cancellation at the invocation of break or early return etc. So I respect your gut reaction there - as it was the same as mine initially. In the end; we decided that having two systems of cancel was less than ideal and that favoring the task cancellation would have more compiler support.

4 Likes