Proposal to add cancellation abilities for Async/Await


(Doug Stein) #1

I have a proposal to include cancellation support with the upcoming async/await feature, which I wrote it up in a fork off of Chris Lattner's and Joe Groff's "Async/Await for Swift" gist:

https://gist.github.com/dougzilla32/ce47a72067f9344742e10020ad4c8c41
https://gist.github.com/dougzilla32/ce47a72067f9344742e10020ad4c8c41/revisions

The motivation is to provide a consistent and simple API to cancel asynchronous tasks. My theory (seemingly shared by some) is that supporting cancellation is too much trouble for many programmers, because it is a bit involved and the app may still basically work without it. But performance will likely suffer. With this proposal cancellation becomes a simple task so is much more likely to be implemented by app developers.

I created a prototype to see if the proposal actually works (it does!), which simulates async/await and cancellation: https://github.com/dougzilla32/AsyncCancellation

The idea is to have a flavor of beginAsync that returns something that can be cancelled, which would in turn cancel all enclosed asynchronous operations:

/// Defines an asynchonous task for the purpose of cancellation.
protocol AsyncTask {
    func cancel()
    var isCancelled: Bool { get }
}

/// A list of async tasks along with the associated error handler
public protocol AsyncTaskList: AsyncTask {
    var tasks: [(task: AsyncTask, error: (Error) -> ())] { get }
}

/// Returns an 'AsyncTaskList' that can be used to cancel the enclosed
/// asynchronous tasks.
func beginAsyncTask(_ body: () async throws -> Void) rethrows -> AsyncTaskList

///  Invoking 'task' will add the given 'AsyncTask' to the chain of tasks enclosed by
/// 'beginAsyncTask', or does nothing if enclosed by 'beginAsync'.
func suspendAsync<T>(
  _ body: (_ continuation: @escaping (T) -> (),
           _ error: @escaping (Error) -> (),
           _ task: @esccaping (AsyncTask) -> ()) -> ()
) async throws -> T

It is possible to extend AsyncTaskList to add other methods:

/// Add 'suspend' and 'resume' capabilities to AsyncTaskList
extension AsyncTaskList {
    func suspend() { tasks.forEach { ($0.task as? URLSessionTask)?.suspend() } }
    func resume() { tasks.forEach { ($0.task as? URLSessionTask)?.resume() } }
}

/// Extend URLSessionTask to be an AsyncTask
extension URLSessionTask: AsyncTask {
    public var isCancelled: Bool {
        return state == .canceling || (error as NSError?)?.code == NSURLErrorCancelled
    }
}

/// Add async version of dataTask(with:) which uses suspendAsync to handle the callback
extension URLSession {
    func dataTask(with request: URLRequest) async -> (request: URLRequest, response: URLResponse, data: Data) {
        return await suspendAsync { continuation, error, task in
            let dataTask = self.dataTask(with: request) { data, response, err in
                if let err = err {
                    error(err)
                } else if let response = response, let data = data {
                    continuation((request, response, data))
                }
            }
            task(dataTask)
            dataTask.resume()
        }
    }
}

Example usage for async URLSession.dataTask:

func performAppleSearch() async -> String {
    let urlSession = URLSession(configuration: .default)
    let request = URLRequest(url: URL(string: "https://itunes.apple.com/search")!)
    let result = await urlSession.dataTask(with: request)
    if let resultString = String(data: result.data, encoding: .utf8) {
        return resultString
    }
    throw WebResourceError.invalidResult
}

// Execute the URLSession example
do {
    let chain = try beginAsyncTask {
        let result = try performAppleSearch()
        print("Apple search result: \(result)")
    }   
...
    chain.suspend()
...
    chain.cancel()
} catch {
    print("Apple search error: \(error)")
}

More detailed information can be found in the gist and in the prototype.

Cancellation could be implemented separately, but including it with Swift Async/Await would encourage authors of coroutines to provide an AsyncTask to cancel the coroutine.

I'd love to get feedback and suggestions!!

Doug


#2

I agree that cancellation is important topic around async/await.
I am also thinking about it recently.

I looked at your implementation.
Especially its elegant that
conform existent type such like URLSessionTask to AsyncTask using same name of cancel.

But I think that your design has too large granularity for language feature.
For example, if I undestand correctly,
user can not control cancellation group for independent await call in one async function.


#3

I want to share my idea to you.

First, without any support about cancellation for async/await,
user can use pattern as like below.

// cancel notification sender interface
class CancelContext {
	var token: CancelToken
	func cancel()
}

// cancel notification receiver interface
class CancelToken {
	func register(handler: @escaping () -> Void)
}

// cancellable async func take cancel token
func download(url: URL, cancelBy cancelToken: CancelToken) async -> Data
func decodeImage(data: Data, cancelBy cancelToken: CancelToken) async -> UIImage

// task driver create CancelContext
func onStartButton() async {
    let cancelContext = CancelContext()
    self.cancelContext = cancelContext
    let cancelToken = cancelContext.token

    // user needs to pass same cancelToken for async functions 
    // which are wanted to compose same cancellation group.
    let data = await download(url: urlField.text, cancelBy: cancelToken)
    let image = await decodeImage(data: data, cancelBy: cancelToken)
    self.imageView.image = image
}

func onCancelButton() {
	self.cancelContext?.cancel()
}

To generalize, almost composition of await call are become to below.

func start() async {
    let cancelContext = CancelContext()
    self.cancelContext = cancelContext
    let cancelToken = cancelContext.token

    let a = await f(cancelBy: cancelToken)
    let b = await g(a, cancelBy: cancelToken)
    let c = await h(b, cancelBy: cancelToken)
    let d = await k(c, cancelBy: cancelToken)
    ...
}

There is a problem.
Repeating explicit cancelBy: cancelToken is boring and too expressive.
User want to simply say "they are same group".

So I imagine code like blow.

func start() async {
    let cancelContext = CancelContext()
    self.cancelContext = cancelContext

    cancelContext.grouping {

        let a = await f()
        let b = await g(a)
        let c = await h(b)
        let d = await k(c)
        ...

    }
}

If we can write this and it run as same explicit pattern,
Its best solution about async/await with cancellation.

Unfortunately I can not implement this.

The thing seems to this, autoreleasepool.

https://developer.apple.com/documentation/objectivec/2299644-autoreleasepool

This higher function is can be considered as
it applies implicit hidden argument to all autorelease function invoking in it.
And in implementation detail, pool is stored at thread local storage.

Same technique can not apply to this cancellation grouping.
Because async function may switch thread.
So thread local storage is useless for it.

In this point, I got a one conclusion.
We need a some abstraction about single consecutive invocation of await.
In thread programming, this is Thread.current.threadDictionary.
So for example, we need such like CoroutineContext.current.contextDictionary.
If it is, each cancellable function take CancelToken from this implicitly.


(Joe Groff) #4

Yeah, since that initial proposal, we identified the need to be able to associate context with a coroutine. See this follow up thread:

As for cancellation, I like the model described by Nathaniel Smith in this blog post:

https://vorpus.org/blog/timeouts-and-cancellation-for-humans/

He lays out the problems with composing operations with cancellation contexts and has some interesting ideas about taking cancellation tokens and applying them in an easy-to-use scoped manner that doesn’t require much boilerplate.


#5

Thanks to information.
I have missed that thread about coroutine context.
And that blog seems to say same as what I want to tell.
I was relieved to know some people think about cancellation.
I'm looking forward to progress async/await.


(Jon Hull) #6

That is a wonderful blog post... would love to see something like that natively in Swift's async/await!

Also, rereading the proposal reminded me of something I had argued for earlier, and feel I have a better handle on now that I have been using futures/promises in swift. Instead of the proposed eventual Future class, I would like to see Swift's built-in support spelled as follows:

//If you use 'async' instead of `await` in front of an async func, you get something like a future
let myFuture = async myThrowingAsyncFunc() 

//To get the actual value out of the future, you must eventually `await` it. 
//If it can have an error, then this is when it will throw (hence the try)
let value = try await myFuture

Internally, this could still be implemented as a class, if desired... but there is also an opportunity for additional compiler optimization that we wouldn't have with a public class. The compiler could reorder statements when appropriate, for example. An internal implementation can also be changed over time more easily.

I would spell the future's type: async T where T is the resulting value type. This would allow us to pass futures between functions safely:

func myFutureTakingFunc(_ future: async T) async {...} 

I use the term "future" above to help understanding, but they could really just as easily be called "asynchronous variables".

let x : async T = ...  // You can use this pretty much like any other var, except you must await it to get the value

The nice thing is that our scoping rules for variables should naturally keep things from escaping, as long as we require these to be declared in an asyncContext.

I realize that this probably won't make the first proposal, but if something like this is desirable (and I believe it to be)... then we really should consider it in the manifesto.


(Joe Groff) #7

I don't think we lose any optimization opportunities making those normal declarations, without special syntax. Similar to the proposed Result design, you could have the Future { ... } initializer box up an async computation, and give Future an unwrapped() method to release the value in an async context later. With our ABI resilience design, there isn't really any implementation hiding advantage normal types have over builtin language features.


(Jon Hull) #8

My gut tells me that there would be lots of places where with the syntax, you wouldn't even have to box-then-unbox anything... but I will defer to your expertise on optimization.

I think my main concern is that I want all of this to feel like a first class part of the language, since in real world apps, it comes up all the time. Futures are great, but I don't think wrapping things in Future {...} (or Result {...}) and then calling unwrapped() feels first class. That is fine for Result, since many of the use cases will be obsoleted by async/await. The idea of being able to pass around asynchronous things however, will come up often enough that it really does warrant special consideration. (Imagine how annoying Optionals would be if you had to wrap everything in Optional{...} and call unwrapped() and flatmap() to use/chain values.)

The advantages of async variables:

  • The syntax fits naturally with async/await and try/throws
  • It doesn't require any additional keywords (just being able to use them in additional places)
  • It can naturally integrate with whatever cancel/timeout features are built for await (i.e. you can just think of await as await)
    • For example: Let's say we spell timeouts await(upTo: 10), then it is spelled the same for async vars, since we are just awaiting their value.
  • It can be added incrementally after async/await
  • It gives us a standard way to pass around asynchronous things (including blocks/functions)
  • It would naturally allow automatic promotion of non-async variables where appropriate (similar to optionals)
    • For example T could promote to async T when passed to an async T parameter
  • It doesn't prevent people from building a Future class on top of it
  • It has a minimal surface area, but still feels like a built-in first class part of the language (similar to optionals and throws)

(Dante Broggi) #10

I think having Async<T> as a compiler-known, but nominal, type is preferable to adding custom syntax. However, I do not think Async<T> should be be copyable.

e.g.

/* moveonly */ struct Async<Wrapped> { 
  init(do: @escaping () -> Wrapped) -> Async<Wrapped> { /* … */ }
  __consuming func await() -> T { /* … */ }
  /* … */
}

(Joe Groff) #11

To me, part of the point of async coroutines is that you don't need to pass values representing async computations around explicitly as much, only when you need to coordinate between multiple independent computations. Future is only one possible coordination mechanism, and not even necessarily the best one. It seems premature to start adding new special case syntax, and every special syntax added is another new thing users have to learn. The basic language feature should start out as simple and flexible as possible.


(Doug Stein) #12

Wow, I'm blown away by all the feedback!!! I took my time responding as I've been updating my experimental project to try and incorporate all the feedback!

  • Regarding Joe Groff's feedback about Contextualizing async coroutines
    • I'm now using the async coroutine context for providing a cancel context
  • Regarding Joe Groff's pointer to Nathaniel Smith's Timeouts and cancellation for humans AND omochimetaru's feedback with CancelContext groups and CancelTokens
    • I've incorporated cancel context for grouping
    • The cancel context can hand you a cancel token for finer granularity
    • I've added support for setting a timeout
  • Regarding Jon Hull's feedback about futures
    • I haven't yet incorporated this into the prototype. At the moment I'm using class Future<T> straight from the original Async/Await proposal.
  • Regarding Dante Broggi's feedback about having Async as a nominal type
    • I'll try this! Not in there at the moment.
  • Also got some feedback about the Progress Foundation class
    • I think Progress can be extended to play nicely with AsyncTask and CancelContext but haven't done this yet.

Here's the revised proposal (current version of the README in the project https://github.com/dougzilla32/AsyncCancellation):

Overview

In this proposal, cancellation and timeout features are implemented using coroutine contexts:

  • A cancel context (class CancelContext) is used to track cancellable asynchronous tasks within coroutines.
  • Tasks are manually added to the cancel context as they are created, and are automatically removed from the cancel context when their coroutine is resolved (i.e. the coroutine produces a result or an error).
  • The cancel context has a cancel() method that can be used to explicitly cancel all unresolved tasks.
  • The cancel context has a timeout: TimeInterval property for setting a timeout to cancel all unresolved tasks.
  • When cancel() is called on a cancel context, all of its unresolved tasks are immediately resolved with the error AsyncError.cancelled. Unwinding and cleanup for the task happen in the background after the cancellation error is thrown.
  • The cancel context is thread safe, therefore the same instance can be used in multiple unrelated calls to beginAsync (course granularity)
  • The cancel context can produce cancel tokens for finer granularity of cancellation and timeouts. The cancel token can be used as a coroutine context, which will create a tree shaped cancellation structure (note: the cancel context is itself a cancel token)

Examples

// Simple timer example

let cancelContext = CancelContext()
let error: (Error) -> () = { error in
    if error.isCancelled {
        print("Meaning Of Life calculation cancelled!")
    } else {
        print("An unknown error occurred while calculating the Meaning Of Life: \(error)")
    }
}

do {
    try beginAsync(context: cancelContext, error: error) {
        let theMeaningOfLife: Int = await suspendAsync { continuation, error in
            let workItem = DispatchWorkItem {
                Thread.sleep(forTimeInterval: 0.1)
                continuation(42)
            }
            DispatchQueue.global().async(execute: workItem)
            (getCoroutineContext() as CancelToken?)?.add(task: workItem)
        }
        if theMeaningOfLife == 42 {
            print("The Meaning Of Life is 42!!")
        } else {
            print("Wait, what?")
        }
    }
} catch {
    // Error is handled by the beginAsync 'error' callback
}

// Set a timeout (seconds) to prevent hangs
cancelContext.timeout = 30.0

...

// Call 'cancel' to abort the request
cancelContext.cancel()

main.swift contains the image loading example from 'Async/Await for Swift' by Chris Lattner and Joe Groff.

Prototype Limitations

This implementation has a limitation where suspendAsync blocks the current thread until either continutation or error has been called. According to the Async/Await proposal suspendAsync is supposed to allow the current thread to resume execution while any of its coroutine are waiting to be resolved.

To implement suspendAsync properly would require either a custom preprocessor that rewrites the code or would require compiler support. These is beyond the scope of this experimental implementation.

Assumptions

I made some assumptions about how coroutine contexts might work, and how to handle errors (such as cancellation) that are propagated up to beginAsync. These are described below:

Contextualizing async coroutines

The proposal for Contextualizing async coroutines is incorporated in this implementation. I found that it is desirable to allow multiple contexts with beginAsync. Also for nested beginAsync calls, the contexts for the outer scopes should be preserved. My take on this is the following:

  • The coroutine context type is Any
  • Multiple contexts are combined into an array [Any]
  • There is a global function getCoroutineContext<T>() -> T?. If the current coroutine context conforms to T then it is returned directly. Otherwise if the context is an [Any], the first member of the array that conforms to T is returned. If there is no match then nil is returned.
  • If getCoroutineContext has multiple matches, then inner contexts take precidence over outer contexts.
  • For nested calls to beginAsync, the outer coroutine context is merged with the new coroutine context to form the inner coroutine context using the following rules:
    1. If either outer or new is nil, then use the non-nil value
    2. If outer === new, then it is the same reference so just use outer
    3. If outer and new are both [Any], then concatenated new and outer (new comes first)
    4. If outer is [Any], then prepend new to outer
    5. If new is [Any], then append outer to new
    6. Concatenate new and outer as [Any]

Error handling for beginAsync

Error handling for beginAsync is not fully specified in 'Async/Await for Swift' by Chris Lattner and Joe Groff.

To handle errors throw by the body parameter in the beginAsync function, I've added an optional error handler parameter to beginAsync.

API

For 'cancellation' abilities the following changes and additions are proposed and are experimentally implemented in this project:

/// Represents a generic asynchronous task
public protocol AsyncTask {
    func cancel()
    var isCancelled: Bool { get }
}

/// The CancelToken is used to register an AsyncTask within a cancellation scope
public protocol CancelToken {
    func add(task: AsyncTask)    
    func cancel()    
    var isCancelled: Bool { get }
}

/// Cancellation error
public enum AsyncError: Error {
    case cancelled
}

/// Shortcut to check for cancellation errors
extension Error {
    public var isCancelled: Bool {
        do {
            throw self
        } catch AsyncError.cancelled {
            return true
        } catch {
            return false
        }
    }
}

/**
 The CancelContext serves to:
 - Cancel tasks
 - Provide a CancelToken for registering cancellable tasks
 - Provide the current list of cancellable tasks, allowing extensions of CancelContext to define new operations on tasks
 - Optionally specify a timeout for its associated tasks
 Note: A task is considered resolved if either its `continuation` or its `error` closure has been invoked.
 */
public class CancelContext: CancelToken {
    public func cancel()  
    public var isCancelled: Bool { get }

    /// Add a cancellable task to the cancel context
    public func add(task: AsyncTask)

    /// Create a token that can be used to associate a task with this context, and can be used
    /// to cancel or set a timeout on only the token's tasks (as a subset of the CancelContext tasks).
    public func makeCancelToken() -> CancelToken

    /// All associated unresolved tasks will be cancelled after the given timeout.
    /// Default is no timeout.
    public var timeout: TimeInterval

    /// The list of unresolved tasks for this cancel context
    public var tasks: [CancelContext.Cancellable] {

    public struct Cancellable {
        let task: AsyncTask
        let tokenId: UInt
        let error: (Error) -> ()
    }
}

/**
 Return a coroutime context matching the given type `T` by applying the following checks in sequential order:
 1. If the coroutine context matches type 'T', then it is returned
 2. If the coroutine context is an array, then return the first item in the array matching `T`
 3. Return `nil` if there are no matches
 */
public func getCoroutineContext<T>() -> T?

/**
 Begins an asynchronous coroutine, transferring control to `body` until it
 either suspends itself for the first time with `suspendAsync` or completes,
 at which point `beginAsync` returns. If the async process completes by
 throwing an error before suspending itself, `beginAsync` rethrows the error.

 Calls to `beginAsync` may be nested, which can be used to provide additional
 coroutine contexts.  Coroutine contexts from outer scopes are inherited by
 concantenating all contexts as an array.  The `getCoroutineContext` function
 runs through this array looking for a matching type.

 - Parameter asyncContext: the context to use for all encapsulated corotines
 - Parameter error: invoked if 'body' throws an error
*/
public func beginAsync(
    asyncContext: Any? = nil,
    error: ((Error) -> ())? = nil,
    _ body: @escaping () throws -> Void) rethrows

/**
 Suspends the current asynchronous task and invokes `body` with the task's
 continuation closure. Invoking `continuation` will resume the coroutine
 by having `suspendAsync` return the value passed into the continuation.
 It is a fatal error for `continuation` to be invoked more than once.

 - Note: Cancellation is not supported with this flavor of `suspendAsync`
   and attempts to access the `cancelToken` will trigger a fatal error.
*/
public func suspendAsync<T>(
    _ body: @escaping (_ continuation: @escaping (T) -> ()) -> ()
    ) -> T

/**
 Suspends the current asynchronous task and invokes `body` with the task's
 continuation and failure closures. Invoking `continuation` will resume the
 coroutine by having `suspendAsync` return the value passed into the
 continuation. Invoking `error` will resume the coroutine by having
 `suspendAsync` throw the error passed into it. Only one of
 `continuation` and `error` may be called; it is a fatal error if both are
 called, or if either is called more than once.

 Code inside `body` can support cancellation as follows:

     // 'task' conforms to 'AsyncTask'
     (getCoroutineContext() as CancelToken?)?.add(task: task)

*/
public func suspendAsync<T>(
    _ body: @escaping (_ continuation: @escaping (T) -> (),
    _ error: @escaping (Error) -> ()) -> ()
    ) throws -> T

This example shows how to define URLSession.dataTask as a coroutine that supports cancellation:

/// Add 'suspend' and 'resume' capabilities to CancelContext
extension CancelContext {
    func suspendURLSessionTasks() { tasks.forEach { ($0.task as? URLSessionTask)?.suspend() } }
    func resumeURLSessionTasks() { tasks.forEach { ($0.task as? URLSessionTask)?.resume() } }
}

/// Extend URLSessionTask to be an AsyncTask
extension URLSessionTask: AsyncTask {
    public var isCancelled: Bool {
        return state == .canceling || (error as NSError?)?.code == NSURLErrorCancelled
    }
}

/// Add async version of dataTask(with:) which uses suspendAsync for the callback
extension URLSession {
    func dataTask(with request: URLRequest) async -> (request: URLRequest, response: URLResponse, data: Data) {
        return await suspendAsync { continuation, error in
            let task = self.dataTask(with: request) { data, response, err in
                if let err = err {
                    error(err)
                } else if let response = response, let data = data {
                    continuation((request, response, data))
                }
            }
            (getCoroutineContext() as CancelToken?)?.add(task: task)
            task.resume()
        }
    }
}

This example demonstrates usage of the URLSessionTask.dataTask coroutine including cancellation and timeout:

import Foundation

// Example: how to make a cancellable web request with the
// URLSession.dataTask coroutine
let cancelContext = CancelContext()
let error: (Error) -> () = { error in
    print("Apple search error: \(error)")
}

do {
    beginAsync(context: cancelContext, error: error) {
        let urlSession = URLSession(configuration: .default)
        let request = URLRequest(url: URL(string: "h t t p s://itunes.apple.com/search")!)
        let result = await urlSession.dataTask(with: request)
        print("result: \(String(data: result.data, encoding: .utf8))")
    }
} catch {
    // Error is handled by the beginAsync 'error' callback
}

// Set a timeout (seconds) to prevent hangs
cancelContext.timeout = 30.0

...

// Call 'cancel' to abort the request
cancelContext.cancel()

Here is the image loading example from the original Async/Await proposal, along with cancellation and timeout abilities (from main.swift):

/// For the purpose of this example, send a simple web request rather than loading actual image data
func loadWebResource(_ name: String) throws -> Data {
    let urlSession = URLSession(configuration: .default)
    let request = URLRequest(url: URL(string: "h t t p s://mydatarepo.com/\(name)")!)
    let result = await urlSession.dataTask(with: request)
    return result.data
}

func decodeImage(_ profile: Data, _ data: Data) throws -> Image

func dewarpAndCleanupImage(_ image : Image) async -> Image

/// Image loading example
func processImageData1a() async -> Image {
    let dataResource  = Future { await loadWebResource("dataprofile.txt") }
    let imageResource = Future { await loadWebResource("imagedata.dat") }

    // ... other stuff can go here to cover load latency...

    let imageTmp    = await decodeImage(try dataResource.get(), try imageResource.get())
    let imageResult = await dewarpAndCleanupImage(imageTmp)
    return imageResult
}

// Execute the image loading example
let queue = DispatchQueue.global(qos: .default)
let cancelContext = CancelContext()
let error: (Error) -> () = { error in
    print("Image loading error: \(error)")
}

do {
    try beginAsync(context: [cancelContext, queue], error: error) {
        let result = try processImageData1a()
        print("Image result: \(result)")
    }
} catch {
    // Error is handled by the beginAsync 'error' callback
}


// Set a timeout (seconds) to prevent hangs
cancelContext.timeout = 30.0

...

// Call cancel to abort the request
cancelContext.cancel()