[Concurrency] Evolving the Concurrency design and proposals

Two safe versions could be provided, though: one that checks that the continuation has been resumed only once, and ignores further resumes, and one that performs the same check and fatal errors in case of multiple resumes. For the first one, this could give:

func request() async -> SomeResult {
    Task.withContinuation { continuation in
        requestWithClosure { result in
            continuation.resume(returning: result)
        }
    }
}

The misuse of not resuming is not caught. I think we can still remove the "unsafe" qualifier, since all future/reactive/promise libs out there also just rely on documentation in order to prevent such a mistake. Maybe this last sentence belongs to the "old world", filled with unsafety and fragile conventions, and some would consider those convenience checks as no less unsafe than the unchecked version. I don't know.

2 Likes

I think so. Avoiding this is a large part of the benefit of structured concurrency, and marking the escape hatch with Here Be Dragons seems desirable and even necessary.

2 Likes

This is the easy path.

One also has to consider the developers who will wrap existing and well-behaved completion-based apis. Do we really want their code to be riddled with "unsafe" calls, when they do nothing wrong, and they have no way to get rid of them expect writing the convenience wrappers I just described, possibly in a wrong way ?

Unsafety-in-your-face can just be pedantic without benefits, and even have plain bad consequences. I suggest an open mind.

1 Like

I think there was a comment on another thread saying that the second one, dynamically enforcing at-most-once behaviour, was intended to be baked in.

However, transitioning from async semantics to continuation-that-might-not-resume semantics is unsafe, and requires appropriate care. I donā€™t see any benefit to an API that glosses over this.

1 Like

Never mind. The rationale for an API without "unsafe" in its name is there, for anyone interested. Have a nice day!

It does show the "entry into the async world":

// can be not-async world
let dinnerHandle = Task.runDetached { // () async -> Dinner in 
  // async world
  await makeDinner()
}
// can be not-async world

We can add this implicit type of the closure to the writeup though to make it more clear, thanks.

3 Likes

Ah I see. As a Swift user who only casually follows Swift's evolution, I had expected this to be expressible explicitly with the async/await syntax, somewhat like this:

func request() async -> SomeResult {
    async var asyncResult: SomeResult
    requestWithClosure { result in
        asyncResult = result
    }
    return asyncResult
}

I'd love async to extend to properties in that way, but there are probably a million technical reasons for why that's impossible :cry:

Why not go the other way, allowing "throws async" instead of banning "try await"? We would probably need to allow arbitrary orders, including have the call site be in a different order than the declaration site, when/if we move to a general effects mechanic. (Hmm, should we do the general effects system first, then add async/await?)

I think enforcing a clear order is good idea. This would save community from writing linters/formatters normalising the order. But it is not clear to why try await was preferred over await try.

I'm reading await and try as prefix operators, applied from right to left.

  1. await try foo() means "try to start an asynchronous operation, and if it starts wait for the result". Signature of foo being equivalent to foo() -> Result<Promise<T>, Error>.
  2. try await foo(), means "launch an asynchronous operation which may fail, wait for it to finish and try to get its result". Signature of foo being equivalent to foo() -> Promise<Result<T, Error>>.

Proposal models the second case, so IMO, try await and throws async make more sense.

2 Likes

How to turn a long running (e.g., compute-intensive) synchronous function into a non-blocking async primitive (to take full the advantages of structured concurrency, run multiples in parallel)? Do we need to manually resort to DispatchQueue/Operation/OperationQueue? Or thereā€™s a Task API ready for that?

Can we still Task.checkCancellation() during the long running synchronous function?

1 Like

I thought I might have overlooked something (there is much text about concurrency), but as you didn't get a simple answer yet, there really seems to be a hole here...
Has anyone already thought of allowing the following?

let resultA = async takesALongTime(100000000)
...
return await min(resultA, resultB)

(I think there is no explanation needed: Either the idea is obvious, or it does not fly ;-)

Exactly, there is an API for that.

Thereā€™s one thing still confuses me:

  public static func withUnsafeContinuation<T>(
    operation: (UnsafeContinuation<T>) -> ()
  ) async -> T { ... }

Will the this operation be running on a different thread than its caller? Otherwise it will block the calling thread.

If the answer is yes, does withGroup have similar semantics? Will the body passed to withGroup also be running on a different thread?

Thanks!

Itā€™s packaging up the ā€œrest of the current functionā€ into a continuation (the UnsafeContinuation instance) that you can use in a completion handler closure. When that completion handler gets called, the rest of your function continues. Itā€™s glue for completion-handler APIs.

withGroup is very different. It helps you manage a set of child tasks that run concurrently.

Doug

withUnsafeContinuation is the right choice for converting an old school completion-handler based function into an async primitive.

But my question is:

Do we need to manually resort to DispatchQueue/OperationQueue to make a long running (e.g., compute-intensive) synchronous function into a non-blocking async primitive (to fully take the advantages of structured concurrency, run multiples in parallel)? Or, is there a Task API ready for that?

I originally thought runDetached was the way to go before @kirilltitov kindly point out it was withUnsafeContinuation I should look at. Plus, I donā€™t want the long running task(s) be detached from the invoking scope, which is what runDetached offers.

In a trivial sense, if by ā€œblockingā€ you mean long-running, you can wrap any synchronous function in an async function and then itā€™s an async function, which you can schedule however you want. (Note that async by itself doesnā€™t imply anything about thread scheduling.)

In a deeper sense: no, you canā€™t take actually-blocking code ā€“ i.e., code which puts a system thread in a blocked state ā€“ and make it non-blocking without rewriting it. It needs to be audited for internal blocking calls (like synchronous I/O), have those rewritten to use async alternatives, and also be audited for assumptions of atomicity that get violated by the addition of suspension points.

If this could be fully automated, there wouldnā€™t be a need for special syntax.

Given a long-running pure function (performs no I/O, yields no side effect, only compute-intensive, only depends on the value-typed input arguments, so nothing to audit I guess):

func calculateSync(input: Int) -> Int { /* minutes long */ return 42 }

calculateSync is synchronous and takes minutes to finish. In order to calculate multiple values in parallel, via the proposed structured concurrency APIs, calculateAsync is required:

func calculateInParallel() async throws -> [Int] {
  await try Task.withGroup(resultType: (Int, Int).self) { group in 
    var values: [Int] = Array(0..<8)
  
    for idx in values.indices {
      await try group.add { 
        (idx, calculateAsync(values[idx])) 
      }
    }

    while let (idx, computed) = await try group.next() {
      values[idx] = computed
    }
    
    return values
  }
}

How to implement calculateAsync? Can Task.withUnsafeContinuation alone make it happen? Or DispatchQueue.async is also needed? If DispatchQueue.async is still required, can we perform Task.checkCancellation() there?

Thanks.

1 Like

I think you could use Task.runDetatched in this case, then use structured concurrency to manage its lifetime (async let or a TaskGroup like in your example). If itā€™s very compute intensive, I guess you might also want to schedule it on a dispatch queue and use that to manage how many run simultaneously.

First, I donā€™t think the 8 concurrent child-tasks are detached by nature. Because:

  1. They need to be cancelled if their parent task is cancelled (they need to perform Task.checkCancellation() periodically).
  2. They also need completion observing.

Second, and most important, Iā€™m not sure if Task.runDetatched starts its closure in another thread. Otherwise, there wonā€™t be 8 concurrent tasks, only 1.

So Iā€™m not sure runDetached is the correct way to do this. Please correct me if Iā€™m wrong.

Thereā€™s another final (off topic) question. Below code still has too many ceremony IMHO:

func calculateInParallel() async throws -> [Int] {
  await try Task.withGroup(resultType: (Int, Int).self) { group in 
    var values: [Int] = Array(0..<8)
  
    for idx in values.indices {
      await try group.add { 
        (idx, calculateAsync(values[idx])) 
      }
    }

    while let (idx, computed) = await try group.next() {
      values[idx] = computed
    }
    
    return values
  }
}

I think above code block is equivalent to:

func calculateInParallel() async throws -> [Int] {
  await Array(0..<8).map { calculateAsync($0) }
}

Why the language wonā€™t offer such capability? Please let me know if this has been discussed before.

To chime in with one key insight here: TaskGroups are low level by design. They are what is used to build convenient operators. Itā€™s not yet clear what the stdlib will or will not ship in terms of convenience functions, but if parallelism over dynamic numbers of tasks is needed, task groups is what is used to implement them.

Think about it this way:

  • streams ā€” ordered signals, ā€œlongā€, element-by-element processing; back-pressure as known thanks to the reactive-streams standard
  • groups ā€” unordered signals, ā€œwideā€, parallel processing or ā€œscatter gatherā€ style tasks, with control over the breadth of the processing, backpressure as defined by a specific group (suspending the add)
  • actors ā€” ā€œisolatedā€ data, events linearized by mailbox, but can be sent by any other task, reacting to events from many places; back pressure here is implicit, via the use of async calls to invoke actors we prevent starting too many calls until after the previous ones completed

They are primitives what kinds of operators and higher level abstractions one can build and name on top of them is obviously a very large space, but we are not in the business of defining them all at once. We need to get the indivisible abstractions right, such that the operations can be built on them.

Specific task group application examples:

  • select {} - spawn many tasks, get the first one ā€” this basically is like Goā€™s select builtin
  • first(n:) { ... } - spawn some tasks concurrently, this is different than first(n) on a stream; take the first n, cancel others` (many variations here about what to do about errors)
  • collect(n:) - regardless of errors, attempt to collect at least n results; cancel the rest
  • map ā€” with limited parallelism
  • quorum(...) spawns n tasks, awaits n/2 + 1
11 Likes