[Concurrency] Continuations for interfacing async tasks with synchronous code

Hi everyone. The UnsafeContinuation API, which exposes async task continuations so they can be used to interface with existing synchronous event-based code, were buried somewhat arbitrarily in the Structured Concurrency proposal. Since there are interesting things to discuss about this API particularly, and it's an important part of interfacing with existing APIs, so we thought it deserved a proposal of its own:

https://github.com/jckarter/swift-evolution/blob/continuation-proposal/proposals/ABCD-continuation.md

Follow the github link above to see any eventual revisions. Here's my initial draft inline:


Continuations for interfacing async tasks with synchronous code

Introduction

Asynchronous Swift code needs to be able to work with existing synchronous
code that uses techniques such as completion callbacks and delegate methods to
respond to events. Asynchronous tasks can suspend themselves on
continuations which synchronous code can then capture and invoke to
resume the task in response to an event

Swift-evolution thread:

Motivation

Swift APIs often provide asynchronous code execution by way of a callback. This
may occur either because the code itself was written prior to the introduction
of async/await, or (more interestingly in the long term) because it ties in
with some other system that is primarily event-driven. In such cases, one may
want to provide an async interface to clients while using callbacks internally.
In these cases, the calling async task needs to be able to suspend itself,
while providing a mechanism for the event-driven synchronous system to resume
it in response to an event.

Proposed solution

The library will provide APIs to get a continuation for the current
asynchronous task. Getting the task's continuation suspends the task, and
produces a value that synchronous code can then use a handle to resume the
task. Given a completion callback based API like:

func beginOperation(completion: (OperationResult) -> Void)

we can turn it into an async interface by suspending the task and using its
continuation to resume it when the callback is invoked, turning the argument
passed into the callback into the normal return value of the async function:

func operation() async -> OperationResult {
  // Suspend the current task, and pass its continuation into a closure
  // that executes immediately
  return await withUnsafeContinuation { continuation in
    // Invoke the synchronous callback-based API...
    beginOperation(completion: { result in
      // ...and resume the continuation when the callback is invoked
      continuation.resume(returning: result)
    }) 
  }
}

Detailed design

Raw unsafe continuations

The library provides two functions, withUnsafeContinuation and
withUnsafeThrowingContinuation, that allow one to call into a callback-based
API from inside async code. Each function takes an operation closure,
which is expected to call into the callback-based API. The closure
receives a continuation instance that must be resumed by the callback,
either to provide the result value or (in the throwing variant) the thrown
error that becomes the result of the withUnsafeContinuation call when the
async task resumes:

struct UnsafeContinuation<T> {
  func resume(returning: T)
}

func withUnsafeContinuation<T>(
    _ operation: (UnsafeContinuation<T>) -> ()
) async -> T

struct UnsafeThrowingContinuation<T> {
  func resume(returning: T)
  func resume(throwing: Error)
  func resume<E: Error>(with result: Result<T, E>)
}

func withUnsafeThrowingContinuation<T>(
    _ operation: (UnsafeThrowingContinuation<T>) -> ()
) async throws -> T

The operation must follow one of the following invariants:

  • Either the resume function must only be called exactly-once on each
    execution path the operation may take (including any error handling paths),
    or else
  • the resume function must be called exactly at the end of the operation
    function's execution.

Unsafe*Continuation is an unsafe interface, so it is undefined behavior if
these invariants are not followed by the operation. This allows
continuations to be a low-overhead way of interfacing with synchronous code.
Wrappers can provide checking for these invariants, and the library will provide
one such wrapper, discussed below.

Using the Unsafe*Continuation API, one may for example wrap such
(purposefully convoluted for the sake of demonstrating the flexibility of
the continuation API) function:

func buyVegetables(
  shoppingList: [String],
  // a) if all veggies were in store, this is invoked *exactly-once*
  onGotAllVegetables: ([Vegetable]) -> (),

  // b) if not all veggies were in store, invoked one by one *one or more times*
  onGotVegetable: (Vegetable) -> (),
  // b) if at least one onGotVegetable was called *exactly-once*
  //    this is invoked once no more veggies will be emitted
  onNoMoreVegetables: () -> (),
  
  // c) if no veggies _at all_ were available, this is invoked *exactly once*
  onNoVegetablesInStore: (Error) -> ()
)
// returns 1 or more vegetables or throws an error
func buyVegetables(shoppingList: [String]) async throws -> [Vegetable] {
  try await withUnsafeThrowingContinuation { continuation in
    var veggies: [Vegetable] = []

    buyVegetables(
      shoppingList: shoppingList,
      onGotAllVegetables: { veggies in continuation.resume(returning: veggies) },
      onGotVegetable: { v in veggies.append(v) },
      onNoMoreVegetables: { continuation.resume(returning: veggies) },
      onNoVegetablesInStore: { error in continuation.resume(throwing: error) },
    )
  }
}

let veggies = try await buyVegetables(shoppingList: ["onion", "bell pepper"])

Thanks to weaving the right continuation resume calls into the complex
callbacks of the buyVegetables function, we were able to offer a much nicer
overload of this function, allowing async code to interact with this function in
a more natural straight-line way.

Checked continuations

Unsafe*Continuation provides a lightweight mechanism for interfacing
sync and async code, but it is easy to misuse, and misuse can corrupt the
process state in dangerous ways. In order to provide additional safety and
guidance when developing interfaces between sync and async code, the
library will also provide a wrapper which checks for invalid use of the
continuation:

struct CheckedContinuation<T> {
  func resume(returning: T)
}

func withCheckedContinuation<T>(
    _ operation: (CheckedContinuation<T>) -> ()
) async -> T

struct CheckedThrowingContinuation<T> {
  func resume(returning: T)
  func resume(throwing: Error)
  func resume<E: Error>(with result: Result<T, E>)
}

func withCheckedThrowingContinuation<T>(
    _ operation: (CheckedThrowingContinuation<T>) -> ()
) async throws -> T

The API is intentionally identical to the Unsafe variants, so that code
can switch easily between the checked and unchecked variants. For instance,
the buyVegetables example above can opt into checking merely by turning
its call of withUnsafeThrowingContinuation into one of withCheckedThrowingContinuation:

// returns 1 or more vegetables or throws an error
func buyVegetables(shoppingList: [String]) async throws -> [Vegetable] {
  try await withCheckedThrowingContinuation { continuation in
    var veggies: [Vegetable] = []

    buyVegetables(
      shoppingList: shoppingList,
      onGotAllVegetables: { veggies in continuation.resume(returning: veggies) },
      onGotVegetable: { v in veggies.append(v) },
      onNoMoreVegetables: { continuation.resume(returning: veggies) },
      onNoVegetablesInStore: { error in continuation.resume(throwing: error) },
    )
  }
}

Instead of leading to undefined behavior, CheckedContinuation will instead
ignore attempts to resume the continuation multiple times, logging a warning
message. CheckedContinuation will also log a warning if the continuation
is discarded without ever resuming the task, which leaves the task stuck in its
suspended state, leaking any resources it holds.

Alternatives considered

Name CheckedContinuation just Continuation

We could position CheckedContinuation as the "default" API for doing
sync/async interfacing by leaving the Checked word out of the name. This
would certainly be in line with the general philosophy of Swift that safe
interfaces are preferred, and unsafe ones used selectively where performance
is an overriding concern. There are a couple of reasons to hesitate at doing
this here, though:

  • Although the consequences of misusing CheckedContinuation are not as
    severe as UnsafeContinuation, it still only does a best effort at checking
    for some common misuse patterns, and it does not render the consequences of
    continuation misuse entirely moot: dropping a continuation without resuming
    it will still leak the un-resumed task, and attempting to resume a
    continuation multiple times will still cause the information passed through
    the continuation to be lost. It is still a serious programming error if
    a with*Continuation operation misuses the continuation;
    CheckedContinuation only helps make the error more apparent.
  • Naming a type Continuation now might take the "good" name away if,
    after we have move-only types at some point in the future, we want to
    introduce a continuation type that statically enforces the exactly-once
    property.
15 Likes

Maybe we can crash the program on the second call to CheckedContinuation.resume and on deinit if the resume is never called.

They do sound like logic errors.


Should we also include TaskAPIs (cancellation, task local) in UnsafeContinuation ?

extension UnsafeContinuation {
  var currentTask: Task.Handle { ... }
}
4 Likes

Instead of having UnsafeContinuation be defined as so:

struct UnsafeContinuation<T> {
  func resume(returning: T)
}

func withUnsafeContinuation<T>(
    _ operation: (UnsafeContinuation<T>) -> ()
) async -> T

struct UnsafeThrowingContinuation<T> {
  func resume(returning: T)
  func resume(throwing: Error)
  func resume<E: Error>(with result: Result<T, E>)
}

func withUnsafeThrowingContinuation<T>(
    _ operation: (UnsafeThrowingContinuation<T>) -> ()
) async throws -> T

I think that it would be better to be mostly typed-throws agnostic and have:

struct UnsafeContinuation<T, E: Error> {
  func resume(returning: T)
  func resume(throwing: E)
  func resume(with result: Result<T, E>)
}

func withUnsafeContinuation<T>(
    _ operation: (UnsafeContinuation<T, Never>) -> ()
) async -> T

typealias UnsafeThrowingContinuation<T> = UnsafeContinuation<T, Error>

func withUnsafeThrowingContinuation<T>(
    _ operation: (UnsafeThrowingContinuation<T>) -> ()
) async throws -> T

And likewise for CheckedContinuation.

1 Like

I'd like to discuss whether it's better for it to trap rather than silently swallow multiple calls – it seems the possibilities for unrealized data loss could be significant otherwise.

16 Likes

I don't think we want to provide a separate API to access the task handle from here; if we allow you to get a task handle for the current task, you can capture that in the closure instead. If we don't allow you to get a task handle for the current task, we shouldn't have this API either. Either way, this extension is something that we should consider as part of Structured Concurrency. Joe's proposal is more of an extension of async/await than a part of Structured Concurrency.

Doug

4 Likes

+1 for trapping in face of abuse CheckedContinuation.

Definitely for resuming more-than-once, that's easy to detect with just an atomic; crashing is a much better idea than ignoring silently the APIs may be "checked" but still low-level and one should know what they're doing when using those and definitely fix wrong usage.


CheckedContinuation will also log a warning if the continuation is discarded without ever resuming the task, which leaves the task stuck in its suspended state, leaking any resources it holds.

Shouldn't we also crash when a checked continuation is dropped without ever resuming rather than just printing a warning at runtime? Pretty sure such warnings will remain ignored, you know how life goes... I would find this to be a helpful util in debugging "am I accidentally dropping continuations somehow?" even if this would be opt-in with some flag or env variable.


The operation must follow one of the following invariants:

  • Either the resume function must only be called exactly-once on each execution path the operation may take (including any error handling paths), or else
  • the resume function must be called exactly at the end of the operation function's execution.

Was this slightly reworded or am I imagining it? The wording is a bit unclear to me... Isn't it both those invariants rather than one of them? I understand "at end of function" to be somewhat the same as "on every path", since each execution path leads to some "end of the function" same with return or throw, what am I missing here?


Don't expose UnsafeContinuation
...

This section is missing any text, seems like an omission?

Though I guess the wording there would just be "that's a bad idea, we need it" :wink:

3 Likes

Can the continuation APIs be nested within the Result enum?

  • The Success and Failure types won't need to be inferred.
  • There's no need for separate throwing and non-throwing APIs.
extension Result {

  public struct UnsafeContinuation {

    // Or use `callAsFunction(_:)` instead?
    public func resume(_ result: Result)
  }

  public static func withUnsafeContinuation(
    _ body: (UnsafeContinuation) -> Void
  ) async -> Self
}

Example:

extension Data {

  init(asyncContentsOf url: URL) async throws {
    let result = await Result<Data, Error>.withUnsafeContinuation {
      continuation in
      let request = URLRequest(url: url)
      let task = URLSession.shared.dataTask(with: request) {
        data, _, error in
        if let error = error {
          continuation.resume(.failure(error))
        } else {
          continuation.resume(.success(data ?? Data()))
        }
      }
      task.resume()
    }
    self = try result.get()
  }
}

To avoid forced try!, a non-throwing API could also be added:
(But it would be source-breaking with -warnings-as-errors.)

extension Result where Failure == Never {

  public func get() -> Success {
    switch self {
    case .success(let success):
      return success
    }
  }
}
2 Likes

NB. As @Douglas_Gregor suggests, perhaps cancellation should be considered as part of Structured Concurrency. For that reason I cross-posted to that pitch topic.

I believe the *Continuation API needs cancellation support. A wrapped callback-based API could be performing significant work, and it’d be wasteful to let that continue after the wrapping task is cancelled.

@Lantua already hinted at this yesterday, suggesting to expose the current Task.Handle, but IIUC those are meant to control the task from the outside, whereas with this continuation API we’re really “inside” a task. Task.Handle would expose the cancel method (which cancels the task when called), whereas what we may need here is to define what happens after the task was cancelled.

I believe we need to be able to:

  • check if the wrapping task was already cancelled;
  • install a cancellation handler where we could instruct the wrapped API to cancel.

I propose adding isTaskCancelled methods and taskCancellationHandler variables. In order to be able to set the latter, the *Continuation would have to be passed inout to the closure passed to with*Continuation. Using the simplest example from the pitch document for clarity:

// Continuation API (throwing & checked variants omitted)

struct UnsafeContinuation<T> {
	func resume(returning: T)

	func isTaskCancelled() -> Bool
	var taskCancellationHandler: () -> Void = {} // Initially nop
}

func withUnsafeContinuation<T>(
	_ operation: (inout UnsafeContinuation<T>) -> Void
) async -> T


// Example completion callback-based API with cancellation handle

struct OperationHandle {
	func cancel()
}

func beginOperation(completion: (OperationResult) -> Void) -> OperationHandle


// Example

func operation() async -> OperationResult {
	return await withUnsafeContinuation { continuation in
		let operationHandle = beginOperation(completion: { result in
			guard !continuation.isTaskCancelled() else { return }
			continuation.resume(returning: result)
		})
		
		continuation.taskCancellationHandler = {
			operationHandle.cancel()
		}
	}
}

I'm not sure that's a good idea, because (a) we can't tell what type the Handle has from an individual continuation, so it would need to be erased somehow, and (b) it increases the possibility for nonlocal effects on the task from the suspending block beyond just resuming the task.

Crashing when a continuation gets resumed multiple times would be reasonable, since that would normally corrupt the process. On the other hand, dropping the continuation without resuming it, without checking, would normally still leave the process in a defined state. Also, the way we detect dropped continuations currently is also via a class's deinit, which can be imprecise due to the vagaries of refcounting and ARC optimization. So having checking introduce a trap in code that would otherwise run mostly normally, and furthermore introduce that trap in a way that its exact timing could depend on optimization settings, gives me some pause.

I don't think this requires specific API on the continuation. The task's own code can arrange to cancel itself in response to a specific state passed in through the continuation.

I think what you mean is that the value of OperationResult passed to *Continuation.resume(returning:) could be inspected, and if that value passes some test the task should be considered cancelled? That’s not the scenario I’m referring to — rather I’m referring to a scenario where the task itself is cancelled first, and that cancellation needs to be propagated to the wrapped callback-based API lest it continue doing work that’s no longer required.

Take for example this usage of withUnsafeThrowingContinuation wrapping URLSession.dataTask:

func download(url: URL) async throws -> Data? {
	return try await withUnsafeThrowingContinuation { continuation in
		let urlSessionTask = URLSession.shared.dataTask(with: url) { data, _, error in
			// EDIT: Lantua notes below that this next line shouldn’t be here
			//guard !continuation.isTaskCancelled() else { return }
			if let error = error {
				continuation.resume(throwing: error)
			} else {
				continuation.resume(returning: data)
			}
		}
		urlSessionTask.resume()

		continuation.taskCancellationHandler = {
			urlSessionTask.cancel()
		}
	}
}

Here’s some context to further clarify. IIUC when one of two concurrent child tasks fails, the other gets cancelled. I.e. in the example below when the “bad” download’s task fails and throws an error, the “good” download’s task gets cancelled. That cancellation should propagate and eventually call URLSessionTask.cancel to avoid unnecessarily downloading more of the “good” data.

func main() async -> Void {
    async let goodDownloadData = download(url: URL(string: "https://example.com/good-data")!)
    async let badDownloadData = download(url: URL(string: "https://httpbin.org/status/404")!)
    do { print(try await [goodDownloadData, badDownloadData]) }
    catch { print("The “bad” download failed:", error) }
}

We's still want to resume regardless, but we can skip unneeded computations (which is the point of cancellation) so we can resume earlier.

I have the same impression that we need something of similar effect.

1 Like

Ah, yes, re-reading the Structured Concurrency proposal document I believe you’re right. So we shouldn’t have the guard !continuation.isTaskCancelled() else { return } line in my example (edited to comment it out). Possibly we don’t need the *Continuation.isTaskCancelled API at all, then? But I do believe we still need something like *Continuation.taskCancellationHandler.

What if you need a more complex series of event? Load this, then load that, etc. If you cancel early, you can skip parts of the computation (within the same continuation) that would otherwise be wasteful.

Then again, it's hard to see what's absolutely required of the glue code. Probably cancellationHandler at the very least to glue together the new Task cancellation and old sync cancellation.

Hello,

I'm wondering about wrapping async processes that are not based on a completion block.

For example, the completion may be provided by an event or a method call. In such cases, one would like to store the continuation somewhere until it is completed:

func operation() async -> OperationResult {
    return await withUnsafeContinuation { continuation in
        OperationImplementation(continuation: continuation).start()
    }
}

class OperationImplementation {
    let continuation: UnsafeContinuation<OperationResult>
    func start() {
        // register for some callback
    }
    func callback(result: OperationResult) {
        continuation.resume(returning: result)
    }
}
  • Is it a pattern that is supported? Fostered?
  • How is the implementation supposed to check for cancellation?
1 Like

+1 for trapping. Possibly this is a behavior that varies with -Ounchecked.

Cancellation seems a bit like throws - some tasks are very interested in it, others not so much. To me this suggests we need withUnsafeCancellableContinuation, withUnsafeThrowawableCancellableContinuation as options, leaving the proposed continuations as noncancellable.

Like in Combine AnyCancellable, some callers will be interested in cancelling when the continuation deinits. I am unsure if CheckedContinuation intends to trap/error on the deinit situation, but this is a case where the correct behavior is different between cancellable/noncancellable, which is another reason for having more types.

There is a natural conversion between types, namely that cancellable tasks are a bit like a failure and vice versa. Callers that are uninterested in distinguishing between a task that was cancelled and one that failed ought to be able to interchange these, as to write one branch for success and the other for not-success without having to think in detail about all the reasons the task may no longer be running.

This seems fine (supported), you do want to ensure to not call start() twice though as that’d resume twice which is bad™. Other than that such patterns are fine.

Thank you @ktoso! And what about cancellation checking?

To be honest I find it a bit weird to want this specific API have anything to do with cancellation — it is for contexts which are not within the task infrastructure so by itself it can’t do anything about that.

In [Pitch #2] Structured Concurrency - #116 by ktoso I propose additions or reshaping of the Task APIs such that one would be able to get a Task object, and if one wanted to one could then check it from wherever — including implementations using *Continuations. That’s about as much as we can achieve I think.

3 Likes

@ktoso, do we have to expect developers to wrap many existing asynchronous APIs that support cancellation, such as completion blocks, Combine publishers, other reactive tools, DispatchQueue/thread/mutex/lock-based APIs, etc? Maybe those APIs will eventually be migrated to Swift's async model. But some will not. And this eventual migration will take time.

Meanwhile, do we really want to ignore cancellation?