[Pitch #3] Structured Concurrency

We've prepared another revision of the Structured Concurrency proposal. Here is the current revision:

Here was the revision at the time this thread was started: swift-evolution/nnnn-structured-concurrency.md at eb149799aac93b3fb84a97adb2bb3db47179a21b · DougGregor/swift-evolution · GitHub

Relative to the previous pitch, we've made the following further changes in response to feedback:

  • Factored out the async let sugar and the with*Continuation APIs into their own proposals.
  • Added discussion about where newly spawned child and detached tasks run, tying into the recent pitch about executors.
  • Clarified the semantics of withCancellationHandler, and added an example of its use (thanks to @jayton)
  • Revised the static APIs on Task to be non-async, so that they are also available to non-async code when running on behalf of a task, as well as the ability to get Task instances representing other tasks, and an UnsafeCurrentTask object that can be used within the current task to access its state.
  • Task.Group now conforms to the AsyncSequence protocol to allow for await loops to iterate through the results of child tasks.
  • Adopted the @concurrent attribute for closures where appropriate in APIs like Task.runDetached and Task.Group.add.

Thanks for all of your ongoing feedback, everyone!

15 Likes

Thanks for polishing it up and the update @Joe_Groff :slight_smile:

As I'm going through and adjusting the implementation to match this, I realized we're not exposing group.isCancelled. It is pretty trivial to surface that one based on our impl, and I don't see a good reason not to.

The other way to notice that the group was cancelled is performing an add that will return false. The group's cancellation is distinct from the entire current task's cancellation. Cancelling the group cancels the group and all of it's children, but does nto cancel any other children the current task may have -- from prior to entering the group. We don't have a query-only group cancellation function, so I think we might want to add that one, any thoughts why we might not want to do that?

I really like how this is coming together. A few questions, some of which are perhaps just the result of typographic errors:

  • I'm a little unclear on the distinction between BodyResult and TaskResult in the Task.withGroup API. I think TaskResult is the type that every task added to the group is expected to produce, and BodyResult is the type produced by the group as a whole. Is that correct? If so, might it make more sense to rename them to GroupResult and ChildTaskResult for sake of clarity?

  • In this example, should it be try await mealHandle.get()? Or does Task.Handle implement the dynamic callable stuff?

    func eat(mealHandle: Task.Handle<Meal, Error>) {
      let meal = try await mealHandle()
      meal.eat() // yum
    }
    
  • In the section on the implementation of the currentPriority API, the discussion seems to have been truncated:

    The rationale for the default value is that if running outside of the Task infrastructure, there is no way to impact the priority of...

In the document you referenced, there is an example that shows how to process multiple elements concurrently in two stages - this one:

Example
/// Concurrently chop the vegetables.
func chopVegetables() async throws -> [Vegetable] {
  // Create a task group where each child task produces a Vegetable.
  try await Task.withGroup(resultType: Vegetable.self) { group in 
    var veggies: [Vegetable] = gatherRawVeggies()
    
    // Create a new child task for each vegetable that needs to be 
    // chopped.
    for i in veggies.indices {
      await group.add { 
        return veggies[i].chopped()
      }
    }

    // Wait for all of the chopping to complete, collecting the veggies into
    // the result array in whatever order they're ready.
    while let choppedVeggie = try await group.next() {
      veggies.append(choppedVeggie)
    }
    
    return veggies
  }
}

Having a little bit of trouble with understanding how this feature gonna work I tried to copy-paste this code into xcode, got latest snapshot from git and .... unexpectedly got this error:

Mutation of captured var 'veggies' in concurrently-executing code
Code that caused error
struct Vegetable {
  var isChopped = false
  func chopped() -> Vegetable { .init(isChopped: true) }
}
func chopVegetables() async throws -> [Vegetable] {
  // Create a task group where each child task produces a Vegetable.
  try await Task.withGroup(resultType: Vegetable.self) { group in
    var veggies: [Vegetable] = .init(repeating: .init(), count: 500)
    
    // Create a new child task for each vegetable that needs to be
    // chopped.
    for i in veggies.indices {
      await group.add {
        return veggies[i].chopped()
      }
    }
    
    // Wait for all of the chopping to complete, collecting the veggies into
    // the result array in whatever order they're ready.
    while let choppedVeggie = try await group.next() {
      veggies.append(choppedVeggie)
    }
    
    return veggies
  }
}

Is this error appears because the implementation is unfinished and will work in future or is this example erroneous? If it is, what are other good ways to implement concurrent processing with overlapping stages of computation with structured concurrency?

That's correct and we also are not super in love with the current names I think. I like your suggestions actually :+1:

We're not really jumping into sugar yet; generally you'll be working with async futures, and handles should come up only rarely (that's the plan at least), as such them looking a bit 'meh' is okey I think. We can always add sugar later on. I'm not sure the sugar should be () though; if anything there could be an Awaitable protocol that we can await mealHandle on -- though we explicitly are not jumping into doing such sugar just yet.

Thanks, fixed!

The intended wording is:

The rationale for the default value is that if running outside of the Task infrastructure, there is no way to impact the priority of a task if not using the task infrastructure after all.

Oh that's a great catch; the compiler does the right thing and it is the proposal that has the bug.

The add closure is concurrently executing and thus those array accesses may be unsafe. Note that the add operation is @concurrent @escaping which is how you can know this.

Solutions are:

  1. meh: capture veggies explicitly in a capture list; since we're not mutating the array itself here but only get the vegetable this will be fine. This will cause copy on write though I believe, as we concurrency pull results and return those chopped ones.
    for i in veggies.indices {
      await group.add { [veggies] in // meh
        return veggies[i].chopped()
      }
    }
  1. better: get the explicit veggie we need for this operation and capture it, rather than the entire array. This should not cause CoW on the array and should therefore perform better.
    for i in veggies.indices {
     let v = veggies[i]
      await group.add {
        return v.chopped()
      }
    }

I'll double check with folks and amend the proposal...


Double checked with @Douglas_Gregor and @Joe_Groff's latest semantics addition is actually even different.

It still will error so we need to amend the proposal, but in the following way:

  • the veggies are captured in add, this means they cannot be allowed to be mutated "under its feet"
  • the mutating happening in the next() result collection will be disallowed and cause a compile time error.

This ensures that the mutation can not change the values of the previously captured array, as that would be unsafe. So we're going to prevent the mutation, not the capture.

The solutions I guess are the same as listed above, and we'll need to fix the proposal. Thanks for spotting it @SERENITY :+1:

Sorry, I was unclear; I'm not proposing sugar here. I was noting that it looks to me like the proposal text is using sugar that has not actually been proposed, by eliding the .get() call. Either that or I'm not understanding what's happening.

1 Like

Oh, thank you! :pray: I misread your comment to be suggesting that, will fix the proposal thanks :blush: you can also send in a PR to add the missing get() if you want yourself.

// fixed add missing .get() by ktoso · Pull Request #54 · DougGregor/swift-evolution · GitHub

Reading through the proposal I think there are a few errors in the code examples that make the proposal less clear.

I believe the “Concurrently chop the vegetables” code example in the “Proposed solution > Task groups and child tasks” section has an error in it.

It looks like the result will an array of the raw veggies with the chopped veggies appended to it.

Also in the section “Detailed design > Task API > Task handles” the code sample func eat(mealHandle:) is described as demonstrating the get() method, but doesn't use the get() method.

1 Like

Thanks for the continued iteration!


In the concurrent version of makeDinner(), it seems like we'd have enough information to declare the local variables as delayed-initialization constants:

func makeDinner() async throws -> Meal {
  let veggies: [Vegetable] // edit: non-optional 
  …
  
  try await Task.withGroup(resultType: Void.self) { group in
    await group.add {
      // Assigned at most once:
      veggies = try await chopVegetables()
    }
    …
  }

  // If we reach this point, all child tasks completed successfully, so we know
  // the constants were initialized
  let dish = Dish(ingredients: [veggies, …])
  …
}

If I have that right, then some of the justification for async let in Future directions (around "leading to a crash on unwrap") also don't apply.


As noted in the detailed design, with the additional of AsyncSequence conformance, the gather pass in the chopVegetables() example could be written like so:

    // Wait for all of the chopping to complete, collecting the veggies into
    // the result array in whatever order they're ready.
    for try await choppedVeggie in group {
      veggies.append(choppedVeggie)
    }

If I understand correctly, using while and group.next() is equivalent, but it seems like the for-try-await spelling would be preferred. In synchronous code with a Sequence I'd certainly expect developers to use the for sugar instead of while. Does that intuition transfer here? Is this just a matter of style?

2 Likes

What about having task groups default to launching child tasks on the executor of the current task? This would follow the principle of least astonishment: I would not expect the executor of an async function to change where there is no explicit mention of executors.

Also, from the perspective of a UIKit app developer, I would not want my code running on anywhere but the main thread without explicit mention of such. Ideally, perhaps, Task.runDetached would then always require an explicit executor, but I understand if that is too cumbersome for other domains. Even more ideally, though, Task.runDetached would not even exist, and tasks would always belong to an explicit scope that would define the executor.

It's a different language with different goals and constraints, but to reference some prior art on the matter, in Kotlin jobs (tasks) are always launched in the context of an explicit scope, and child jobs default to the parent's dispatcher (executor):

val job = myCoroutineScope.launch {
    // On the dispatcher defined by myCoroutineScope. For scopes
    // related to UI components, this would be Dispatchers.Main
    
    val childOne = async {
        // on the parent job's dispatcher
    }
    val childTwo = async(Dispatchers.Default) {
        // explicitly on the Default (concurrent) dispatcher
    }
    
    childOne.await()
    childTwo.await()
}
1 Like

strong and weak Tasks.

It might be helpful to mark Tasks in a TaskGroup as either strong or weak to define their influence on terminating their group.

For a group to finish, all their strong tasks must have been finished. Weak tasks which are still running when all strong tasks finished will be canceled.

This allows scenarios like this, where we have a group consisting of:

  • A strong task to drive a robot for some distance.
  • A weak task to blink the led on the robot while it is moving.

The group should finish as soon a the robot drive task has finished independent of whether the blinking task is still in progress.

sounds like strong task group could run in 1 task group, and weak task griup runs in another (detached) task group.

That might be possible, but I am missing a bit the structured concurrency feel to it.

URLSessionTask.priority is a floating-point value between 0.0 (lowest) and 1.0 (highest).

Task.Priority could use similar names and raw values. For example:

extension Task {

  public struct Priority: Hashable, RawRepresentable {

    public static let lowest:  Self = 0.0  // background
    public static let low:     Self = 0.25 // utility
    public static let medium:  Self = 0.5  // default
    public static let high:    Self = 0.75 // userInitiated
    public static let highest: Self = 1.0  // userInteractive

    public typealias RawValue = Float32

    public let rawValue: RawValue

    public init(rawValue: RawValue) {
      self.rawValue = rawValue // FIXME: rawValue.clamped(to: 0...1)
    }
  }
}

extension Task.Priority: ExpressibleByFloatLiteral {

  public typealias FloatLiteralType = RawValue

  public init(floatLiteral rawValue: RawValue) {
    self.init(rawValue: rawValue)
  }
}

extension Task.Priority: Comparable {

  public static func < (_ lhs: Self, _ rhs: Self) -> Bool {
    lhs.rawValue < rhs.rawValue
  }
}
3 Likes

I have a three questions:

  1. Is this the proposed syntax? I'm looking at makeDinner with using task groups and it's extremely verbose.

There are 7 await in the piece of code below.

  // Create a task group to scope the lifetime of our three child tasks
  try await Task.withGroup(resultType: Void.self) { group in
    await group.add {
      veggies = try await chopVegetables()
    }
    await group.add {
      meat = await marinateMeat()
    }
    await group.app {
      oven = await preheatOven(temperature: 350)
    }
  }
  1. From the code above when does chopVegetables start executing? After or before marinateMeat.

  2. In that makeDinner example, the code is forcing the group to wait for the oven heating (probably to show how it works) when you actually need that in the final line of the method. My question is: can preheatOven run like a detached Task while the group runs its two task?

1 Like

Task.withGroup seems really verbose for everyday use. Would the current API allow the following type of utility?

func async all<T1, T2, T3>(_ a1: async () -> T1, _ a2: async () -> T2, _ a3: async () -> T3) -> (T1, T2, T3) {
	return await Task.withGroup(resultType: (T1, T2, T3).self) { group in
		let v1 = await group.add { await a1() }
		let v2 = await group.add { await a2() }
		let v3 = await group.add { await a3() }
		return (v1, v2, v3)
	}
}

This would significantly ease making makeDinner:

let (choppedVeggies, marinatedMeat, preheatedOven) =
  all(chopVegetables, marinateMeat, preheatOven)

It certainly seems like the API allows a utility like this but just wanted to make sure it is possible.

4 Likes

If you just need to execute asynchronous functions concurrently without specifying priorities or executors, you can use async let (to be moved in a separate proposal):

async let veggies = chopVegetables()
async let meat = marinateMeat()
async let oven = preheatOven(temperature: 350)

let result = await (veggies, meat, oven)

Do note that there are some inconsistencies in the example code snippet you provided:

  • Task.Group.add doesn't return the result of its passed function, it returns instead a boolean (true if the job has been added, false if it hasn't because the group was cancelled in the meantime), so v1, v2 and v3 are booleans.
  • The async keyword must be placed between the parameter list and the -> return arrow, like the throws keyword.
  • An all function as suggested but valid in general would require variadic generics, a feature not yet available in Swift.

Ok. It seems async let is the answer to my question. "Everyday" use would be async let and more advanced usages would use Task.withGroup or others.

Indeed, most uses of child tasks are likely to be introduced by async let (not part of this proposal, as it has been separated out to it's own proposal).

Task groups are dynamic though -- you can spawn "n" child tasks in a structured way using them, where n is not known.

Async lets you have to actually "write out n times", so they're not dynamic.


Task groups are a low level building block. All kinds of fancy sugar on top of them can arrive later, and I'm more than happy to jump into those separately but this pitch isn't the right time for those.

Long story short though, it should some day be possible to nicely write gather(first: 3) { <some tasks> } and similar things; but they'd all be implemented using task groups internally.

Terms of Service

Privacy Policy

Cookie Policy