[Pitch #2] Structured Concurrency

Syntactically I think it would be better to have async replace await directly in an "async let" statement instead of being lifted to be in front of the let.
e.g:

{
let ok = async "ok",
    (yay, nay) = async ("yay", throw Nay())
  await ok
  try await yay
}
1 Like

The same way as escaping closures -- they cannot close over things which would be unsafe to access.

Doug had a good example of this recently, as he implemented the checks for local variables as well and spotted a bug in an example we had: https://twitter.com/dgregor79/status/1341941190896185352

I don't see it spelled out explicitly in the proposal, but if falls out of the semantics of what an async let initializer is:

The simplest way to think about it is that anything to the right hand side of the = of an async let is initiated together (as-if in an asynchronous closure), implying that if any of the values initialized by this closure throws, all other left-hand side to-be-initialized variables must also be considered as it they had thrown that error.

I.e. it's a concurrent closure, and thus all checks that are made for those apply to child tasks as well.

Could be good to add a specific example about this in to the proposal.

2 Likes

Right, my question was: if a group has two children A and B both running on a non-exclusive executor and B has a cancellation handler installed, then if a partial task for both A and B are executing concurrently and A throws, what is the semantics for execution of B’s handler relative to its currently executing partial task? Based on your earlier post it sounds like the cancellation handler is executed concurrently with the task. Is that correct?

In general, can you elaborate in more detail about non-exclusive executors? The proposal is very sparse in details about those.

How can structured concurrency work with an API like

https://developer.apple.com/documentation/foundation/urlsession/1407613-datatask ?

Ideally I want to be able to write a function

func asyncDataTask(with: url: URL) async -> (Data?, URLResponse?, Error?) {
 ...
 let dataTask = URLSession.shared.dataTask(with: url) ... ??? ...
}

// Used like this:
 async let result = asyncDataTask(with: url)

My goal is to have asyncDataTask work with async let and automatically handle Task cancelation.

When I try to write asyncDataTask, I am running into the following issues:

  • URLSession.dataTask() returns a URLSessionDataTask object, so it doesn't get the
    automated conversion to async that is provided for simpler asynchronous APIs that return void.
  • There doesn't seem to be a way of registering a cancelation handler to call dataTask.cancel() when the implicit "let async" task is canceled.

It seems unlikely URLSession's current API will map directly to async functions, unless a new overlay is created, as URLSessionTasks are created synchronously and must be manually resumed. So I think you'd use the Task APIs directly to wrap those calls, returning the data types you want. But I haven't yet fully explored the proposal.

Right, the question is if it's possible to wrap the URLSessionDataTask inside an ordinary async function, for the situation where we don't want to expose the fancy URLSessionDataTask features.

I see how I can use the Task api to wrap URLDataTask. But I don't see how to use the resulting Task object to produce a function with the signature

func asyncDataTask(with: url: URL) async -> (Data?, URLResponse?, Error?)

Yet it seems like something that should be possible.

Not unless you're okay losing access to the URLSessionTask. Otherwise you'd likely want to package everything in an actor (similar to what Alamofire does now, manually, with our Request types) which manages access to the URLSessionTask and response state.

But again, I haven't fully explored everything here.

Yeah, I'm happy to lose access to the URLSessionTask as long as it's still cancelable through the Task API. I guess I need to noodle around with Task, TaskHandle.get and related APIs and see if there's some way to make things work.

extension URLSession {
    func asyncDataTask(with url: URL) async -> (Data?, URLResponse?, Error?) {
        await withUnsafeContinuation { continuation in
            dataTask(with: url) { data, response, error in
                continuation.resume(returning: (data, response, error))
            }
        }
    }
}

Nice! That still needs the dataTask to be resumed(), and for the dataTask cancelation to be hooked up to the Task cancelation, but I bet it's on the right track.

extension URLSession {
  func asyncDataTask(with url: URL) async -> (Data?, URLResponse?, Error?) {
    var dataTask: URLSessionDataTask?
    return await try Task.withCancellationHandler(handler: {
      if let dataTask = dataTask {
        dataTask.cancel()
      }
    }) {
      await withUnsafeContinuation { continuation in
        dataTask = self.dataTask(with: url) { data, response, error in
          continuation.resume(returning: (data, response, error))
        }
        // Do I have to check for the Task already being canceled at this point?
        dataTask!.resume()
      }
    }
  }
}

Might work, but I get an "Abort trap: 6" error when I try to compile that on the Dec 23rd Swift toolchain release.... Rewriting as below looks cleaner, it doesn't have the race condition, but it also doesn't compile due to a type error:

extension URLSession {
  func asyncDataTask(with url: URL) async -> (Data?, URLResponse?, Error?) {
    withUnsafeThrowingContinuation { continuation in
      let dataTask = self.dataTask(with: url) { data, response, error in
        continuation.resume(returning: (data, response, error))
      }
      await try Task.withCancellationHandler(handler: {
        dataTask.cancel()
      }) {
        dataTask.resume()
      }
    }
  }
}

Due to "Invalid conversion from throwing function of type '(UnsafeThrowingContinuation<(Data?, URLResponse?, Error?)>) async throws -> Void' to non-throwing function type '(UnsafeThrowingContinuation<(Data?, URLResponse?, Error?)>) -> Void'"

What I’m trying to point out is that your question can be broken down. You’re setting up a complex scenario where a task (P) has two child tasks, one of which (X) throws and (presumably) ultimately causes the parent task to exit the scope in which the child tasks were created. That is an elaborate way of saying that the other child task (Y) is cancelled. From Y’s perspective, that cancellation is no different from any other kind of cancellation, which is to say that it is (potentially) concurrent with Y’s execution; specifically, any cancellation handlers Y has may run concurrently with the normal execution of Y. If Y only needs to do something non-concurrently on cancellation, it can of course just check at any time during its normal execution whether it has been cancelled, and it doesn’t need to mess around with cancellation handlers at all.

Well, the use of URLSession.dataTask, with all its extra features, may be something of a red herring here. I just want to implement a simple "fetch this URL" async function that works correctly with structured concurrency.

The following compiles, but fails at runtime with "Fatal error: withCancellationHandler(handler:operation:) not implemented yet."

(Plus resulting task is detached, so doesn't automatically get canceled.)

I'll give up on cancellation for now. :stuck_out_tongue:

extension URLSession {
  func asyncDataTask(with url: URL) async -> (Data?, URLResponse?, Error?) {
    await withUnsafeContinuation { continuation in
      let dataTask = self.dataTask(with: url) { data, response, error in
        continuation.resume(returning: (data, response, error))
      }
      _ = Task.runDetached {
        return await try! Task.withCancellationHandler(handler: {
          dataTask.cancel()
        }) {
          dataTask.resume()
        }
      }
    }
  }
}

Pardon me for adding onto this comment, but as a new member of the forum I'm not allowed to reply more than three times. I ported a toy web scraper app to Structured Concurrency. I wanted to report that I ran into 3 issues:

  1. The above issues adapting URLSession.dataTask()
  2. I wrote an asyncMap() method using Task.withGroup, and it required that transform closure be marked "@escaping" which doesn't seem right. Is there a way of avoiding "@escaping"?
  3. When I tried to return results from async code to sync code, across a runAsyncAndBlock boundary I got a "Local var is unsafe to reference in code that may execute concurrently" warning. Is there a way of fixing that?

For reference, here's my asyncMap, based on the chopVeggies code from the Structured Concurrency doc:

extension Array {
  func asyncMap<B>(_ transform: @escaping (Element) async throws -> B) async throws -> [B] {
    await try Task.withGroup(resultType: (Int, B).self) { group in
      for i in self.indices {
        await group.add {
          (i, await try transform(self[i]))
        }
      }
      var result = [B?](repeating: nil, count: count)
      while let (index, transformed) = await try group.next() {
        result[index] = transformed
      }
      return result.map { $0! }
    }
  }
}

It seems… inharmonious… that non-throwing tasks are represented by Task.Handle where Failure == Never, but UnsafeContinuation and UnsafeThrowingContinuation are distinct types. Wouldn’t it make sense to have:

// Generic arguments renamed to match Task.Handle instead of single letters
struct UnsafeContinuation<Success, Failure: Error> {
  func resume(returning: Success)
  func resume(throwing: Failure)
}

func withUnsafeContinuation<Value>(
    operation: (UnsafeContinuation<Value, Never>) -> Void
) async -> Value

func withUnsafeThrowingContinuation<Success>(
    operation: (UnsafeContinuation<Success, Error>) -> Void
) async throws -> Success
4 Likes

Simpler yet for the surface model, this can be spelled async(nonawaiting) or @nonawaiting func ... async.

Note that for full functionality, this would have to be accompanied by the introduction of a withoutActuallyAwaiting (or withoutActuallySuspending) so that the implementation of these functions can call other async functions that haven't been annotated as non-suspending but are known never to suspend.

You need to use withoutActuallyEscaping, which is quite apt since transform does escape group.add. It just doesn't escape Task.withGroup.

I still haven't got an answer to that question. I find it strange that group.add requires an await but async let (which is supposed to be sugar over it) doesn't.

Yes, I set up a complex scenario intentionally because I’m trying to understand the behavior of cancellation handlers.

So the cancellation handler does not is not executed by the executor of the task that registered it, right? Is the expectation that a cancellation handler be thread safe then? If so, should the method be called withUnsafeCancellationHandler? I think this detail could surprise a lot of people the way it is written right now.

I see handler: /* @concurrent */ () -> Void in the signature in the proposal. I think I understand why /* @concurrent */ is included now but it is not explained clearly.

Also, can you please update the proposal to provide an example of intended usage of cancellation handlers? This seems like an important topic that deserves at least one example.

Finally, I will ask again for more explanation of the behavior of non-exclusive executors.

There's no API for this. A function "knowing" its executor means that it can hop back to that executor after (e.g.) calling out to another async function that might change executors (say, because it runs on an actor).

We haven't exposed the ability to provide a custom executor beyond the enqueue operation of actors. Some day, probably, but I don't expect it will be part of this round of proposals.

I see your point here, but highest, lowest, etc. are almost entirely devoid of meaning. Having the UI metaphor at least gives some sense of when to use the priorities, even if it's not exactly the domain.

The actor's enqueue is the only mechanism was have for this. Custom executors don't have API yet.

Doug

Ok, so task group & async let does await for their child tasks (subtasks) before exiting the scope (as it should, to be structural). It's actually pretty clear from the draft, and got repeated a few time. It just somehow went over my head during the reading...

One more question, it does seem that the task groups (and async let) cancels all of their subtasks only when they exit the scope by throwing an error. Is there a reason we don't want to cancel if the scope exits normally (by reaching the end or using return)?

I don't want the nuance of why the scope ends to differentiate the cancellation behaviour, especially when "error reporting" doesn't limit to throwing errors, but also returning Bool or Optional.

Thanks! AIUI Swift will need to add a new version of withoutActuallyEscaping to handle async closures, or the existing implementation needs to add the proposed reasync feature.

Terms of Service

Privacy Policy

Cookie Policy