[Pitch] Promotion of `Task.select` from swift-async-algorithms to Swift Concurrency

Task.select

[Source | Tests]

Introduction

A fundamental part of many asynchronous algorithms is being able to select the first resolved task from a given list of active tasks. This enables algorithms like debounce or merge where concurrent execution is needed but the cancellation or scoped execution behavior of groups is not desired.

Proposed Solution

Selecting the first task to complete from a list of active tasks is a similar algorithm to select(2). This has similar behavior to TaskGroup except that instead of child tasks this function transacts upon already running tasks and does not cancel them upon completion of the selection and does not need to await for the completion of all of the tasks in the list to select.

extension Task {
  public static func select<Tasks: Sequence & Sendable>(
    _ tasks: Tasks
  ) async -> Task<Success, Failure>
  
  public static func select(
    _ tasks: Task<Success, Failure>...
  ) async -> Task<Success, Failure>
}

Selecting over existing tasks allows for non-group-like behavior. The tasks themselves can be long running or selected again after a selected task has been found from a previous call to select has been made. By utilizing this strategy, the tasks being selected are no longer child tasks but instead their own independent tasks running until vending a result.

Taking the example of the makeDinner() from the structured concurrency proposal we can further investigate with an example of LineCook that models an iterative job cooking meals.

actor LineCook {
  enum CookingJob { 
    case veggies
    case meat
    case oven
  }
  
  enum CookingStep { 
    case veggies([Vegetable])
    case meat(Meat)
    case oven(Oven)
  }
  
  var activeJobs: Set<Task<CookingStep, Never>>
  
  func addJob(_ step: CookingStep) async {
    switch step {
    case .veggies:
      activeJobs.insert(Task {
        await .veggies(chopVegetables())
      })
    case .meat:
      activeJobs.insert(Task {
        await .meat(marinateMeat())
      })
    case .oven:
      activeJobs.insert(Task {
        await .oven(preheatOven(temperature: 350))
      })
    }
  }
  
  func finishedJob() async -> CookingStep {
    let finishedJob = await Task.select(activeJobs)
    activeJobs.remove(finishedJob)
    return await finishedJob.value
  }
}

In this example awaiting a finished job will select the first finished task out of the active jobs. Just because one job has been completed does not mean that other jobs should be cancelled. Instead it is desirable to let those jobs continue to execute.

Detailed Design

Given any number of Task objects that share the same Success and Failure types; Task.select will suspend and await each tasks result and resume when the first task has produced a result. While the calling task of Task.select is suspended if that task is cancelled the tasks being selected receive the cancel. This is similar to the family of TaskGroup with a few behavioral and structural differences.

Behavior withTaskGroup Task.select
Inner tasks Creates efficient child tasks. Works with pre-existing tasks.
Finishing Awaits all child tasks. Awaits first task.
Cancel on return Cancels child tasks when returning. Does not cancel on return.
Min tasks Supports 0 child tasks. Requires at least 1 task.

This means that withTaskGroup is highly suited to run work in parallel, whereas Task.select is intended to find the first task that provides a value. There is inherent additional cost to the non-child tasks so Task.select should not be used as a replacement for anywhere that is more suitable as a group, but offers more potential for advanced algorithms.

Currently Task.select is implemented with higher level constructs backing it in the Swift Async Algorithms package. Even though this API is relatively advanced in where it should be used, it feels more appropriate for general consumption from the swift concurrency library. Making this API have the new home of the swift concurrency library will allow for a considerably more efficient implementation but also have it along side the other APIs that work in the same arena.

Source compatibility

This proposal is additive and has no implication to source compatibility.

Effect on ABI stability

Implementation of the most optimized version of this proposal may have interoperation with the underlying implementation of Task itself. However since it can be achieved without needing to make those changes we feel that this is achievable without any disruption to ABI.

Effect on API resilience

This proposal is additive and has no implication to API resilience.

Alternatives Considered

A number of alternative names were considered when naming this: one early choice was Task.race, another was Task.first. The name race connotes perhaps less than ideal contexts since the goal of Swift concurrency is to avoid race conditions. First becomes lexically ambiguous; it is unclear on if it is referencing the first task in the sequence of tasks or the first task that finishes.

Another suggestion that was made for naming was Task.winner; this is a slightly better name than Task.race but still suffers similarly about what the behavior really is; in the case of winners it is perhaps inferred that losers are canceled somehow (which is not the desired territory of this API, that is in the domain of TaskGroup).

Acknowledgments

Yasuhiro Inami has offered some helpful early feedback about naming and behaviors.
Tony Parker helped review the initial implementation.
Kevin Perry helped review, test, and debug the initial implementation.

21 Likes

Am I correct to assume that Task.select works similarly to Golangs select statement, but only for receiving values?

https://gobyexample.com/select
https://go.dev/tour/concurrency/5

Yes it is very similar.

1 Like

What should happen if the waiting task has a higher effective priority than one or more of the tasks being blocked on? Should we just permanently escalate the priority of all the tasks, even if we can't de-escalate the other tasks after a different task finishes first?

1 Like

Yes, it should be fine to permanently escalate them. The same type of thing happens with DispatchQueue and NSOperationQueue; where a blocking action causes a domino effect of effective QoS that never settles down after it is “no longer needed” because there is no way to know if the remainder tasks won’t be utilized again (which likely in most use cases they would be used again)

2 Likes

Good point; I like the argument that, if a high-priority task has waited on this task once, it's likely to do so again later.

We should be able to set this up in the runtime so that we record that a task is waiting on a selection and take appropriate escalation action. @rokhinip, what do you think?

2 Likes

Thank you for the detailed pitch. I think there is a great application for a function like select (I've used the alternative in RxSwift and had a custom one in Combine for years).

The first thing that made me an impression in the title is the name of the API. So far Task is something to use to create tasks whether with Task.init(...) or Task.detached(...). (There are of course few more methods to aid doing other things.)

I understand the rationale for Task.select but to me personally it looks our of place looking at the rest of the API surface of the Task type. I see it kind-of fitting in the context of swift-async-algorithms, but out of that package, It feels like it was patched onto the type for the lack of a better place to add it.

Again, I'm a big fan of the async-algos, this is just expressing an honest concern

2 Likes

That concern is part of why this is a pitch; to discuss where it really should belong and how that is spelled.

The performance is definitely at a detriment being outside of the concurrency runtime. And to be honest the functionality feels like categorically it belongs with other task-based-things; which is the swift concurrency library.

So if I understand the concern correctly it is being homed in the concurrency library as that name right? Would it perhaps be better named as a static method on TaskGroup? Or would it perhaps be better as a global function?

I don't really have a problem with this being a static method on Task, unless perhaps we think we can make this an extension method on Collection.

While I recognize that select is something of a term of art for similar features, I don't particularly like the name. I think the Alternatives Considered section suffers from only considering single-word names. This is not going to be such a frequently-used API that making the method name a little longer will be a problem. Perhaps:

extension Collection<Task> {
  var firstToFinish: Task {
    get async { ... }
  }
}
9 Likes

I think one of the major use cases is to remove the first finishing task from collections, perhaps your example is onto something that maybe it should be removeFirstFinished() -> Task as a mutation method?

3 Likes

That does seem like a common use pattern. I guess the key question there is whether we want to require the caller to provide a mutable collection in order to use this functionality.

Also, does it make sense to allow a general collection here, or do we really need an Array or something similar? The runtime support will probably require an array and return an index into it, but of course we can abstract over that in the user-facing function.

Probably not.

Since we are asking for the first to finish out a batch of tasks, the order of the tasks being sent in don't really matter.

Array is only desired for determinism of which thing is tested; unlike select(2) which is non-deterministic order of which is selected, or select from Go which is also not defined of which order of things are executed. In non test scenarios the order won't matter of what is passed in.

But it could just as easily be an Array, ContiguousArray, Set or even OrderedSet or Deque. The common denominator there is Sequence.

However if it turned out that we can implement an efficient system with the only exposed API as just Array as the input I would be happy.

Does that syntax even work? Task is generic upon success/failure; we would need to put something in the type to ferry that info along right?

1 Like

You're right, that would rely on a feature (generic extensions) that we don't have right now. So unless we add that, this will need to be a static or global function somewhere. Task seems like the right place for it.

2 Likes

An instance method is also workable by dropping the constraint to its definition:

extension Collection {
    func firstToFinish<Success, Failure>() async -> Element where Element == Task<Success, Failure> {
        // ...
    }
}
3 Likes

Yep that definitely does work; but it feels like one of the situations where a property would be better suited. Seems to me this type of thing crops up from time to time as a limitation in the language.

5 Likes

The shape of this function is a little curious to me—if the returned task is known to be finished, we could in theory return its value directly, right?

extension Task {
  public static func select<Tasks: Sequence & Sendable>(
    _ tasks: Tasks
  ) async throws -> Tasks.Element.Success
}

I assume this is because we lose typing of the error.

It seems one consequence of returning the Task, though, is that a client must await its value despite it being available synchronously:

let firstToFinish = await Task.select([...])
let value = await firstToFinish.value // Never a suspension point?

In a context which is already asynchronous, this is OK, but it's a little misleading to a reader to require an await which never suspends.

1 Like

The reason for returning the task is to allow for removing the task from any work-lists. We could return both the task and the value as a tuple.

2 Likes

Unfortunately, we'd then only be returning it if the task didn't throw. Unless we return the value as a Result.

1 Like

To John's point, this feels like a compelling reason to introduce a variant which performs this mutation for you.

Returning the task itself seems tailored to the pitch's example of removal from a Set. If the backing storage were e.g. Array, it may be preferable to receive the index if it's used for removal.

I don't have enough familiarity with the use cases of this method to know if an array is likely or sensible as backing storage, but mutating instance methods over SetAlgebra and/or MutableCollection could have the same shape for clients while managing this discrepancy separately.

The current uses in swift-async-algorithms do not use a mutation of the store; it was more done as a generalization to make it apply to other scenarios.

It is used in merge but gets the .value immediately out of the task.

Likewise goes for debounce.

So in truth if you are willing to burn an enum to store things (like these do) then the task is not distinctly needed. I was not sure that was immediately obvious to folks that may need to use this API for other things than debounce or merge.