Following 0304-structured-concurrency.md

I’m looking at the last section about group cancellation in the 0304-structured-concurrency proposal.

I’m trying to access the group inside the addTask closure, but it’s not compiling.

here is simple code

func chopVegetables() async throws  {
    try await withThrowingTaskGroup(of: String.self) { group in
    print(group.isCancelled) // prints false

    group.addTask {
      group.cancelAll() // Cancel all work in the group --> Error 
        throw TaskGroupError.failedOne
    }
  }
}

it is giving me error

Escaping closure captures 'inout' parameter 'group'
Passing closure as a 'sending' parameter risks causing data races between code in the current task and concurrent execution of the closure

from previous read in same proposal group object should not be used elsewhere and instance should not be copied out of the body function, because doing so can break the child task structure. is it the reason it is not allowed ?

2 Likes

First, we should note that group.cancelAll() frequently is not needed in a throwing task group. You have omitted the try await of the task results in your example, but if you do that, the other tasks will be cancelled for you. I draw your attention to the second bullet point from the ThrowingTaskGroup documentation:

Consider:

func strings() async throws -> [String] {
    try await withThrowingTaskGroup(of: String.self) { group in
        group.addTask { try await foo() }
        group.addTask { try await bar() }

        var strings: [String] = []
        for try await string in group {
            strings.append(string)
        }
        return strings
    }
}

Because we try await the group results, if either foo or bar throws an error, if the body of withThrowingTaskGroup will automatically cancel all of its remaining child tasks. A more fulsome example of this is at the end of this answer.


Second, while it’s generally not necessary to employ group.cancelAll(), there are scenarios where you need more fine-grained control over where you cancel all the tasks. You’d use this pattern where you will cancel the group based upon some result of a task rather than its throwing some error.

For example, let’s imagine that:

  • we’re downloading a ton of assets, but
  • we want to have it cancel any unfinished downloads if we lost internet connection; but
  • we want it to ignore other types errors (e.g., if one of the random URLs was no longer valid).

In that case, you could use cancelAll. But you wouldn’t do it inside the addTask:

func urlsForAllDownloads(from urls: [URL]) async throws -> [URL: URL] {
    try await withThrowingTaskGroup(of: (URL, URL).self) { group in
        // create tasks for all the downloads

        for url in urls {
            group.addTask {
                (url, try await urlForSingleDownload(from: url))
            }
        }
        
        // gather the results in a dictionary
        
        var urlMapping: [URL: URL] = [:] // a map from remote web URL to local file URL of downloaded resource

        // lets loop through the downloads, but stop trying if you lost internet connection; we’ll try again later

        repeat {
            do {
                guard let (webURL, fileURL) = try await group.next() else { break }

                urlMapping[webURL] = fileURL
            } catch let error as URLError where error.code == .notConnectedToInternet {
                group.cancelAll() // cancel any downloads not yet finished; no point in trying all the other downloads if the internet connection is down; we’ll try again later
            } catch {
                // ignore other errors, effectively skipping download for anything other than loss of internet
            }
        } while !Task.isCancelled
        
        return urlMapping
    }
}

The key observation is that one uses group.cancelAll() outside the addTask, but still inside the body of withThrowingTaskGroup.

Needless to say, this manual cancellation of all the tasks is less common, but where you need this fine-grained control, it is invaluable. But, generally, we just let the errors propagate up (as in our first example), and we’re done.


In answer of why they designed it this way, I might note that cancelling an entire task group from within an individual task might have some intuitive appeal, other uses of the task group inside addTask might be confusing or problematic. For example, what would it mean if you awaited the result group.next() from inside the addTask closure? That is misleading at best and could even lead to a task that could never finish.


FWIW, here is a MRE of my first example. I have modified to show how bar threw barFailed, the task group detected that and automatically cancelling all pending child tasks, resulting in foo to be cancelled and it throws CancellationError (not barFailed):

func experiment() async {
    do {
        let result = try await strings()
        print(result)
    } catch {
        print("calling task caught", error)
    }
}

func strings() async throws -> [String] {
    try await withThrowingTaskGroup(of: String.self) { group in
        group.addTask { try await foo() }
        group.addTask { try await bar() }
        
        return try await group.reduce(into: []) { $0.append($1) } // a concise alternative to `for` loop
    }
}

// Return “foo” in five seconds unless cancelled.

func foo() async throws -> String {
    let start = ContinuousClock.now
    print("foo start")
    
    do {
        try await Task.sleep(for: .seconds(5))
        return "foo"
    } catch {
        print(#function, "caught", error, start.duration(to: .now))
        throw error
    }
}

// Generally we wouldn’t both catching and rethrowing the error above, but that was for illustrative purposes only.
// We’d generally simplify it to the following:
//
// func foo() async throws -> String {
//     try await Task.sleep(for: .seconds(5))
//     return "foo"
// }

// throw an error after 1 second

func bar() async throws -> String {
    let start = ContinuousClock.now
    print("bar start")
    
    do {
        try await Task.sleep(for: .seconds(1))
    } catch {
        print(#function, "caught", error, start.duration(to: .now))
        throw error
    }
    
    print(#function, "throwing", ExperimentError.barFailed, start.duration(to: .now))
    throw ExperimentError.barFailed
}

enum ExperimentError: Error {
    case barFailed
}

If I call experiment, that will produce:

foo start
bar start
bar() throwing barFailed 1.066956375 seconds
foo() caught CancellationError() 1.068187041 seconds
calling task caught barFailed
2 Likes

Style note: we usually refer to Swift language/library evolution proposals using the prefix "SE-" and then just the number. In this case, this would be SE-0304.

Good answer, thank you @robert.ryan.

As for the example not compiling... It might be the case that this used not to be diagnosed perhaps when the first proposal came out?

The problem that SE proposals face "as documentation" is that they're representing a point in time when the proposal was accepted, and concurrency checking keeps evolving so things like this may happen. If we'd do more documentation reflecting "current" state of the language these would be less problematic as we'd rely less on these snapshots of a point in time also attempting to serve as documentation.

4 Likes

Noted. I'll refer to the Swift language/library evolution proposal as SE-0304.

1 Like

Thanks a bunch for the detailed explanation! It makes perfect sense how throwing a task group can cause an error to propagate to all its child tasks. But when we have the withTaskGroup, how do we cancel all the tasks?

func strings() async  -> [String] {
     await withTaskGroup(of: String.self) { group in
        group.addTask {  await foo() } // 
        group.addTask {  await bar() }
        
        return  await group.reduce(into: []) { $0.append($1) } // a concise alternative to `for` loop
    }
}

func foo() async  -> String {
    "Foo" // -> Something happens here outside the scope of the group 
}
func bar() async  -> String {
    "Bar"
}

The only way I can think of is to return the optional (in this case, the String?) and cancel the group when extracting the value from the child task. However, since all the tasks are running in parallel, there’s a chance that some of them might have already finished.

I’m trying to grasp these concepts, so please correct me if I’m mistaken.

You can adapt @robert.ryan's example from above:

func experiment2() async {
    print (#function, "...")
    let result = await strings()
    print(result)
}

private func strings() async  -> [String] {
     await withTaskGroup(of: String?.self) { group in
         let fv: [() async -> String?] = [foo, bar, fubar]
         for f in fv {
             group.addTask {await f()}
         }
        
         var results: [String] = []

         while let result = await group.next() {
             guard let result else {
                 group.cancelAll()
                 break
             }
             
             results.append (result)
             if results.count == fv.count {
                 break
             }
         }
         
         // possibly partial
         return results
    }
}

private func foo() async -> String? {
    do {
        try await Task.sleep(for: .seconds(5))
        if Int.random (in: 0..<2) > 0 {
            return "foo"
        }
        else {
            return nil
        }
    } catch {
        return nil
    }
}

private func bar() async -> String? {
    try? await Task.sleep(for: .seconds(1))
    if Int.random (in: 0..<2) > 0 {
        return "bar"
    }
    else {
        return nil
    }
}

private func fubar() async -> String? {
    try? await Task.sleep(for: .seconds(1))
    if Int.random (in: 0..<2) > 0 {
        return "fubar"
    }
    else {
        return nil
    }
}

2 Likes

There is a lot to unpack here.

A minor terminological clarification (and I apologize for splitting hairs): If a throwing task group is cancelled (say, if one of its subtasks throws an error), it does not propagate the error to the other child tasks. It cancels those tasks. That is very different.

Let us again refer to the documentation, this time for TaskGroup. It describes two ways to cancel the task group:

This is very similar to the ThrowingTaskGroup documentation that I referenced in a prior answer, except this one only has two of those options (because throwing is off the table). And of these two options, the second one is more common. We use it all the time. So, I’m going to focus on that.

At this point, it would be useful to look at examples. But, if you forgive me, I’m going to temporarily sidestep yours, as it suffers from a lot of problems that might distract us from the cancellation question.

Instead, let us consider a very practical example. And, like I said, I’m going to focus on the “cancel the task running the task group” scenario, as that is more common.

So, let’s imagine we have an iOS app, and a view wants to know when the app became active and when it went into background. Maybe you want to restore/save something. Or maybe you want to update the UI. Regardless, you might have routines that monitor these lifecycle events:

func monitorDidBecomeActive() async {
    for await notification in NotificationCenter.default.notifications(named: UIApplication.didBecomeActiveNotification) {
        await becameActive() // this is our function that will handle when it became active
    }
}

func monitorDidEnterBackground() async {
    for await notification in NotificationCenter.default.notifications(named: UIApplication.didEnterBackgroundNotification) {
        await enteringBackground()  // this is our function that will handle when app goes into background
    }
}

And when we want to run them concurrently, we might do:

func monitorLifecycleEvents() async {
    await withTaskGroup(of: Void.self) { group in
        group.addTask { await monitorDidBecomeActive() }
        group.addTask { await monitorDidEnterBackground() }
    }
}

So, how would you cancel that when, say, the view in question is dismissed? The answer is generally to cancel the Task running this task group. To do this, in UIKit, we would create a property for the Task, e.g., set it in viewDidAppear, and cancel it in viewDidDisappear:

class ViewController: UIViewController {
    var lifecycleEventsTask: Task<Void, Never>?
 
    override func viewDidAppear(_ animated: Bool) {
        super.viewDidAppear(animated)
    
        lifecycleEventsTask = Task { await monitorLifecycleEvents() }
    }
    
    override func viewDidDisappear(_ animated: Bool) {
        super.viewDidDisappear(animated)
        
        lifecycleEventsTask?.cancel()
        lifecycleEventsTask = nil
    }
}

Or in SwiftUI it is even easier, because that will automatically cancel the task launched in a .task view modifier when the view disappears:

struct ContentView: View {
    var body: some View {
        VStack {…}
            .task {
                await monitorLifecycleEvents()
            }
    }
}

So, in both UIKit and SwiftUI, you generally just cancel the task that is running the withTaskGroup, and you’re done. No need for group.cancelAll() at all.


Now, you technically can use group.cancelAll() with a non-throwing withTaskGroup, too. But we just don’t use this pattern too often, because if a function returns a value, but also supports cancellation, the idiomatic pattern is to (a) make it a throwing function; and (b) ensure that it throws CancellationError if cancelled. And that reduces the problem to our prior withThrowingTaskGroup examples.

But, like you said, you theoretically can use optionals rather than the more idiomatic throwing pattern:

func experiment() async {
    let result = try await strings()
    print(result)
}

func strings() async -> [String] {
    try await withTaskGroup(of: String?.self) { group in
        group.addTask { await foo() }
        group.addTask { await bar() }
        
        var strings: [String] = []

        for await string in group {
            guard let string else {   // if we get an nil values, cancel and return
                group.cancelAll()
                return []
            }
            strings.append(string)
        }
        
        return strings
    }
}

// return “foo” in 5 seconds, unless canceled, in which case it will return `nil`

func foo() async -> String? {
    let start = ContinuousClock.now
    print("foo start")
    
    do {
        try await Task.sleep(for: .seconds(5))
        print("foo successfully finished in", start.duration(to: .now))
        return "foo"
    } catch {
        print("foo caught", error, "and is return `nil` in", start.duration(to: .now))
        return nil
    }
}

// return `nil` after one second

func bar() async -> String? {
    let start = ContinuousClock.now
    print("bar start")
    
    do {
        try await Task.sleep(for: .seconds(1))
        print("bar is returning `nil` in", start.duration(to: .now))
        return nil      
    } catch {
        print("bar caught", error, "and is stopping in", start.duration(to: .now))
        return nil
    }
}

The key differences between this example and your non-throwing example include:

  • Use Task.sleep so the tasks take long enough that we can reliably manifest the cancellation;
  • Actually support cancellation in foo and bar (in this case, achieved by using Task.sleep) … there is obviously no point in worrying about task groups cancelling child tasks if those child tasks do not support cancellation in the first place.

Anyway, that yields:

foo start
bar start
bar is returning `nil` in 1.055175833 seconds
foo caught CancellationError() and is return `nil` in 1.058339708 seconds
[]

You also asked:

Yes, whenever dealing with task group cancellation, there is always a chance that some child tasks have finished. You just decide whether you want to return the partial results that finished before the cancellation or whether you want to throw (or return nil or whatever) if any of the tasks failed. It just depends upon the use case.

4 Likes