Accept the first Task to complete

I've been searching without luck for a method or pattern to accept the first member of a TaskGroup (or some other collection of Tasks) to complete and cancel the others. In Java, CompletableFuture has the anyOf method for this purpose. I'm assuming something similar exists in Swift Concurrency?

No need for special API, simply await the first result from the group.

1 Like

To expand on Jon's answer, you probably want something like:

let result = await group.prefix(1)
group.cancelAll()

https://developer.apple.com/documentation/swift/taskgroup/3814880-prefix

2 Likes

Thank you both for your help. Sadly grabbing the first member of the group is only part of the answer: It does return the first task to complete but still waits on the slowest task before returning. As the behavior I want was never specified, I'll assume it's not supported.

That’s not how TaskGroup works; tasks complete independently. Perhaps that’s just a prefix issue. You can also use group.next() or for await on the group to receive a single item.

1 Like

Testing it separately, I can confirm that prefix (or AsyncPrefixSequence) does not wait for all tasks to complete before returning its elements. You may be running into races between the completion of the group's tasks and your await of the prefix. For instance, in this example, there will be various numbers printed before the prefix is fired, then nearly all after the prefix will be properly cancelled, while a certain number complete normally due to being in process when the prefix cancelled the underlying group.

await withTaskGroup(of: Void.self) { group in
    for i in 0..<1_000 {
        group.addTask {
            guard !Task.isCancelled else { print("Cancelled \(i)"); return }
            
            print(i)
        }
    }
    
    for await _ in group.prefix(1) {
        print("Prefix")
        group.cancelAll()
    }
}
1 Like

You are right! Following your example, the fastest task does print quickly but the program does seem to wait afterwards. I think if you modified your example to just two tasks, one that finished quickly and another that took a few seconds, you would see what I mean.

Yes, you need to handle cancellation in long running tasks in order to cut their execution short. Cancellation is cooperative in Swift concurrency, so cancelling the group does not forcibly end all remaining work. You either need to check Task.isCancelled (only works in Xcode 13.2 or higher) or wrap your inner async work in withTaskCancellationHandler.

With a modified version of my earlier example, you can see the work is quickly cancelled, but all tasks still complete before the program exits.

await withTaskGroup(of: Int.self) { group in
    for i in 0..<1_000 {
        group.addTask {
            let nanoseconds: UInt64 = (100_000_000...500_000_000).randomElement()!
            
            guard !Task.isCancelled else { print("Cancelled \(i): \(nanoseconds)"); return i }
            
            do {
                try await Task.sleep(nanoseconds: nanoseconds)
                print("\(i): \(nanoseconds)")
            } catch {
                print("\(i): \(nanoseconds): \(error)")
            }
            
            return i
        }
    }
    
    for await value in group.prefix(1) {
        print("Prefix: \(value)")
        group.cancelAll()
    }
}

In this case you can see that the failable Task.sleep is cancelled while awaiting, letting the execution end early despite needing to wait for up to a second before actually completing. You'll need to structure your underlying async work to behave similarly if you want the program to exit immediately after work is cancelled.

You can also take a look at the implementation of Task.sleep to see how it ends immediately.

1 Like

However, sometimes waiting for the rest of the tasks to be canceled is too time-consuming.

To avoid this, the task-group code could be placed inside a task that closes over a checked continuation. When the first element is received, the task group can be cancelled and resume(returning:) can be called:

func doSomeWork() async {
    _ = await Task.detached {
        try! await Task.sleep(nanoseconds: 1_000)
    }.result
}

let start = Date.now

let _: Void = await withCheckedContinuation { continuation in
    Task {
        await withTaskGroup(of: Void.self) { group in
            // Add tasks to the group
            for _ in 0..<10_000 {
                group.addTask { await doSomeWork() }
            }

           // Indicate that the first element was found
            _ = await group.next()!
            continuation.resume()
            
            // Go through entire group.
            await group.reduce(()) { _, _ in () }
            print("Work finished after \(-start.timeIntervalSinceNow)")
        }
    }
}
print("Got element after \(-start.timeIntervalSinceNow)")
1 Like

You don't have to wait for the other child tasks to complete before the first result is available, as my tests show. So while your solution triggers the continuation right away, it doesn't actually save any execution time since you still await the uncanceled results of the group. So I don't see how this improves anything.

Right; you could call a function that accepts the result right away. The code I shared above simply fits the racing of tasks into the async-await model. Conceptually, you could rename the function to race:

func race<Value, Failure>(
    _ tasks: [() async -> Result<Value, Failure>]
) async -> Result<Value, Failure> {
    precondition(!tasks.isEmpty)

    // Use continuation method to extract first-to-complete result.
}

This async-await API is, in turn, useful for things like AsyncSequence operators — e.g. debounce where the next event of a sequence and a timer are raced.

1 Like

I see what you mean now. However, the form you present is rather inefficient since you don't even give the other tasks the opportunity to cancel. I haven't been able to build it yet, but we really need a construct that combines all of the various behaviors we've discussed recently, including:

  1. Ordered vs. unordered results. Unordered for things like race, ordered for proper user presentation perhaps.
  2. Limited vs. unlimited width. Enqueuing many Tasks is actually rather slow if you do it all once, due to what I believe is high contention around a lock. We need a construct to easily perform, say, 50,000 tasks, that automatically uses the best creation strategy.
  3. Automatic vs. manual cancellation of ongoing work. This is currently rather difficult partially due to an extant AsyncStream bug which has only been merged into main. We can use custom AsyncSequences to work around that issue by tracking the iterator's lifetime, but that's not ideal. But really, for something like race it should at least try to cancel the rest of the work if it's not going to be used.
  4. Gathering vs. not gathering results. It's one thing to just fire off async work, but sometimes you want to gather results into a collection or series of collections.

So we need a solution that offers customization along all of these axis simultaneously. I've built a few different constructs that do some of these things, but making things like creation strategies generic enough to be useful is pretty difficult.

I wonder why AsyncStream is mentioned. Is it being use as an implementation detail because the async APIs are too cumbersome?

There's no other provided construct to wrap around a group if you want a stream of results in the order they complete. It also provides its own abstraction around buffering, which can be useful, and is probably another axis of capability to consider. For instance, you could wrap an AsyncStream around one of the constructs mentioned in this thread and then get the expected first or prefix behavior automatically while also allowing the integration of automatic cancellation handling, without having to specialize your implementation for just the first result case.

1 Like

I omitted that part for simplicity's shake, but I reckon that adding group.cancelAll() after resuming the continuation will work (since the Task is not detached).

By ordered results, do you mean a task group that shows results in the order the tasks were added instead of the order in which they complete?

Hmm. What would an alternate creation strategy, compared to today's one, look like?

That's a really good point.


I think these points deserve their own evolution discussion.

Yes. If I provide an array of work items, I may want an array or stream of their results in the same order.

You can look a my examples in another thread to see what I mean. Essentially, creating lots of Tasks at once, whether child or unstructured, is really slow if you try to do it all at once (due to a lock on creation I think), but if you very slightly space them out, things run much faster. Also, the runtime doesn't seem to do any automatic work balancing of the enqueued tasks, so limiting your width can actually run faster with lower memory and thread usage.

2 Likes
Terms of Service

Privacy Policy

Cookie Policy