Incremental migration to Structured Concurrency

I'm migrating an existing project which uses both completion handlers and semaphores. I'd very much appreciate other's critique of the functions I've created to facilitate this migration.

I've created the inverse of withCheckedContinuation et al called withAsync and withThrowingAsync in both blocking and non-blocking forms. They're there to account for a number of transitions I need to make. Here's a few examples:

withThrowingAsync {
    return "withThrowingAsync with completion handler"
} completion: { result in
    print(result)
}

let value = try withThrowingAsync {
    return "Blocking withThrowingAsync"
}
print(value)

let task = Task {
    return "Waiting on a Task"
}
print(task.wait())

Completion handlers

Usage

withThrowingAsync {
    // Do some awaiting then return the result or throw
} completion: { result in
   // Handle the result of the async block in good old GGC/blocks
}

Implementation

public func withAsync<Object>(priority: TaskPriority? = nil, operation: @escaping @Sendable () async -> Object, completion: @escaping (Object) -> Void) {
    Task(priority: priority) {
        let object = await operation()
        completion(object)
    }
}

public func withThrowingAsync<Object>(priority: TaskPriority? = nil, operation: @escaping @Sendable () async throws -> Object, completion: @escaping (Result<Object, Error>) -> Void) {
    Task(priority: priority) {
        do {
            completion(.success(try await operation()))
        } catch let error {
            completion(.failure(error))
        }
    }
}

Semaphores / blocking code

Usage

let value = try withThrowingAsync {
   // Do some awaiting then return the result or throw
}
// wait just before here then handle the result of the async block in normal code

Implementation

public func withAsync<Object>(priority: TaskPriority? = nil, operation: @escaping @Sendable () async -> Object) -> Object {
    let semaphore = ObjectSemaphore<Object>()
    withAsync(priority: priority, operation: operation, completion: semaphore.signal)
    return semaphore.wait()
}

public func withThrowingAsync<Object>(priority: TaskPriority? = nil, operation: @escaping @Sendable () async throws -> Object) throws -> Object {
    let semaphore = ObjectSemaphore<Result<Object, Error>>()
    withThrowingAsync(priority: priority, operation: operation, completion: semaphore.signal)
    return try semaphore.wait().get()
}

ObjectSemaphore is a bit like Combine.PassthroughSubject but blocking. It encapsulates the common scenario of trying to safely handle the results of an async completion handler we are waiting on.

private class ObjectSemaphore<Object> {
    let semaphore = DispatchSemaphore(value: 0)
    
    private var _object: Object!
    
    func signal(_ object: Object) {
        _object = object
        semaphore.signal()
    }

    func wait() -> Object {
        semaphore.wait()
        return _object
    }
}

Blocking Tasks

For completness I've also added a few function to make waiting on Task object a bit easier, though I don't think they're needed on my project.

extension Task where Failure == Never {
    func wait() -> Success {
        return withAsync(priority: nil) {
            return await value
        }
    }
}

extension Task where Failure: Error {
    func wait() throws -> Success {
        return try withThrowingAsync(priority: nil) {
            return try await value
        }
    }
}

The WWDC 2021 session Swift concurrency: Behind the scenes explicitly calls this pattern out as unsafe because "it violates the runtime contract that threads must always be able to make forward progress" (paraphrased). Start watching around the 25 minute mark.

As I understand it, if you use this pattern, it's possible that all threads in the cooperative thread pool will block at the same time while waiting on semaphores. And this will deadlock the whole concurrency system because the runtime won't spawn more threads to allow the tasks that could signal the semaphores to make progress.

They recommend testing your app with the environment variable LIBDISPATCH_COOPERATIVE_POOL_STRICT=1 to identify uses of these unsafe patterns in your code.

Unfortunately, Apple doesn't offer a better solution in that session to "block while waiting for a Task to complete". As I understand it, this pattern is explicitly not supported because it goes against the design of the system on a fundamental level.

The approach with the semaphores may still be safe (I'm not sure!) if you can guarantee that the semaphore.wait() call occurs outside the cooperative thread pool, e.g. in a thread you spawned manually. But this wouldn't help you anyway if you want to block the current thread.

(cc @rokhinip, who may be able to give better advice.)

2 Likes

Similar to @ole 's comment, I think you need to be very careful with the completion handlers: as you call the completion handler on the Task's thread, it will run on the cooperative thread-pool as well. If then any direct or indirect code in the completion handler blocks, you will again violate the "runtime contract". Not sure, tbh, but maybe you should manually dispatch.async the completion handler on a .global() queue?

1 Like

Really, you'd want to take a queue as a parameter and call the completion handler on that queue. This often defaults to .main rather than a random global queue, since completion work frequently wants to update the UI in some way.

1 Like

Yes this is correct, precisely for the reasons you've cited. To answer the original question, at a high level, the code that is most natural to start migrating towards Swift Concurrency primitives, is code which already presents an asynchronous interface.

One other thing I'd like to highlight is that not all blocking is bad. If you are blocking on IO, or using an os_unfair_lock for data synchronization purposes in a tight critical section, the blocking here is temporary and therefore, fine. The effect is that you may have reduced throughput on the cooperative thread pool, but you will still be able to make forward progress. The dangerous pattern is when you are blocking on other future work to execute - like when you might be relying on the (NCPU+1)th thread to run some code to unblock the NCPU threads that are blocked on a semaphore.

As such, synchronous interfaces which may block, can be used from async code with caution. However, using async code while presenting a synchronous interface - which is what the original question is trying to do - is really the recipe for trouble. The deadlock problem is the most significant and unrecoverable one but there are other problems as well, like susceptibility to priority inversions which may result in user-visible hangs and glitches.

Faking synchronous behaviour while using something asynchronous under the hood, means that the runtime will not know who is going to unblock the synchronous work and therefore, cannot resolve priority inversion issues.

As we mentioned in our WWDC2017 talk, it is important to use primitives which have a notion of "single ownership" in order for priority inversions to be resolved. Semaphores and condition variables do not have such a notion. See Modernizing Grand Central Dispatch Usage - WWDC17 - Videos - Apple Developer for more detail.

Swift concurrency primitives like actors and tasks fall into the first bucket of single ownership and therefore, are capable of providing priority inversion avoidance support.

12 Likes

Thank you everyone, you've given me lots to think about and keep in mind.

The issue we have is that some of the code I'm working on already uses semaphores to wrap callbacks. I should have marked all my functions above with:

@available(*, deprecated, message: "Finish this refactor ASAP")

I think it depends what you care about. Isn't it true that blocking a thread implies a context switch unless there's no other work to do in the entire system (including other processes)? Context switches are expensive, which makes them “bad” for some definition thereof. IIUC this is one of the reasons we use GCD and other thread pools: so that instead of paying for a context switch, a thread can continue with another fragment of work.

This is a great talk! But I'm finding this point confusing. They say:

Using a lock in synchronous code is safe when used for data synchronization around a tight, well-known critical section. This is because the thread holding the lock is always able to make forward progress towards releasing the lock. As such, while the primitive may block a thread for a short period of time under contention, it does not violate the runtime contract of forward progress. It is worth noting that unlike Swift concurrency primitives,

Now, I don't understand how this upholds the guarantee at all. Sure, the thread that succeeds in acquiring the lock makes forward progress, but any other thread contending for it will block, and thus not make forward progress. So what guarantee am I actually being asked to uphold? That at least one thread makes forward progress? I don't think that's quite it, because I can easily deadlock two out of my quad-core system's four threads using a pair of mutexes locked in different orders, and surely that would be bad.

In fact, later in this talk, there's:

Such a code pattern means that a thread can block indefinitely against the semaphore until another thread is able to unblock it. This violates the runtime contract of forward progress for threads.

…which is not different, in any way I can tell, from the critical section scenario, unless there's some way to measure whether something is blocking “indefinitely.” AFAICT if I'm going to uphold a guarantee, it needs to be well-defined, and this one doesn't quite seem to be. Is there a more rigorous formulation akin to what's found here?

Yes, blocking typically involves a thread context switch which is expensive. But it's all relative - here when I said "bad", I meant relative to unsafe behaviour which can lead to unrecoverable deadlocks. It is definitely not efficient to block or context switch excessively which I've talked about in the Swift Concurrency: Behind the Scenes WWDC talk.

There is more nuance here than a blanket statement of saying "blocking is bad".

4 Likes

It is a well defined guarantee that you are being asked to uphold and I hope it becomes clearer if you consider the distinction between locks and semaphores.

Threads that are blocked on a lock, are blocked on another thread that is already holding the lock and in the critical region. It is most likely on another CPU executing the critical region. As a result, the blocked threads will become unblocked shortly after when the lock is released. When you have N threads all blocked on a single lock, they will make progress serially through the critical region protected by the lock. So yes, for a short period of time while blocked on a lock, each of those N-1 threads are not making forward progress. But in a larger eventual timescale - say on the order of hundreds of milliseconds - each of those N-1 threads will eventually get the lock, and then finish its work and make forward progress.

I don't think that's quite it, because I can easily deadlock two out of my quad-core system's four threads using a pair of mutexes locked in different orders, and surely that would be bad.

Yes and that would be bad anywhere, regardless of whether you run on Swift concurrency's cooperative pool or not, because you don't have well-defined lock ordering.

Semaphores on the other hand are different from locks because you never know who is going to unblock a thread blocked on a semaphore. It could be another thread that is running on core and making progress and who will signal the semaphore. Or it could be work that has yet to even start executing - i.e future work. It is that ambiguity which makes it impossible to hold a semaphore safely in the cooperative thread pool. You can easily have NCPU tasks all blocked on a semaphore, expecting an NCPU+1th task to get a thread and run and then signal the semaphore and unblock the NCPU tasks.

AFAICT if I'm going to uphold a guarantee, it needs to be well-defined, and this one doesn't quite seem to be. Is there a more rigorous formulation akin to what's found here?

Another way to think about the guarantee you are being asked to hold is the following: Your Swift Concurrency workload should be able to complete even if the thread pool decides to give you just a single thread.

Using a lock in such a workload would be just fine, the same thread will take the lock and drop it each time. Using a semaphore however, won't be safe. The single thread may run a task that is expecting some condition to be met and blocks on the semaphore until it is. Your workload is now deadlocked unless you are given another thread. It cannot make forward progress here on its own.

Now, if your code was instead awaiting on such a condition, that thread would switch away from the task that cannot make progress, to execute another task which can. Eventually, your workload would complete. The LIBDISPATCH_COOPERATIVE_POOL_STRICT=1 environment variable helps you uphold your runtime guarantee by doing exactly this - it restricts your thread pool to size 1.

8 Likes

Thanks for your answer (and for the talk), Rokhini!

I think that depends on the fairness of the scheduler and the priorities of those threads? I guess we're probably only concerned about blocking the threads in the cooperative threadpool here, where fairness and priority is known.

OK… assuming a thread never causes threads to block without guaranteeing that it will allow them to run again in a finite amount of time, is that enough? Or is this really about having thread dependency information (which is mentioned several times in the talk)?

Heh, this sounds really straightforward, but as I think about it, I'm not sure I know how to use that to draw conclusions about what blocking code I'm allowed to write. I've tried about a dozen ways to rephrase it, and they're all failing me.

Maybe it's just this:

  • if an async task T1 (or any sync code called by T1) causes another async task T2 (or any sync code called by T2) to block, it must allow T2 to run again before T1's next await, and before T1 returns.

Even if I got that right, I'm still not sure what the rules are for a thread that's not in the cooperative pool that causes an async task to block.

Thanks again for taking the time.

1 Like

Yes true, we can easily construct a system whereby all of your threads are preempted for long periods of time due to a constant stream of higher priority work elsewhere - maybe even in a different process. But the point I'm trying to make is that, if the threads are given the CPU time, those threads will make progress and complete.

OK… assuming a thread never causes threads to block without guaranteeing that it will allow them to run again in a finite amount of time, is that enough?

I'm not sure how you can really promise that in code that you write, without knowledge of how the thread pool would schedule your tasks. It's pretty fragile to rely upon it especially if changes are made in scheduling behaviour or even in the number of threads that the thread pool would give you. For instance, you can imagine that the cooperative thread pool may want flexibility to dynamically size up or down depending on what other workload is running on the system so as not to overcommit more threads than we have cores.

Maybe it's just this:

  • if an async task T1 (or any sync code called by T1) causes another async task T2 (or any sync code called by T2) to block, it must allow T2 to run again before T1's next await , and before T1 returns.

Even if I got that right, I'm still not sure what the rules are for a thread that's not in the cooperative pool that causes an async task to block.

I'll be honest, I'm not sure I fully grokked that formalization.

What do you mean by "another thread causing an async task to block"? That can only happen if your async task and this other thread are using some primitive to synchronize between them. So really the code you can write really comes down to what kinds of primitives are safe to use in this world and whether they express the dependencies clearly (as mentioned in the talk):

Is it an await-ing on another task or on an actor whereby the dependency on what will unblock the task is clearly expressed? Or a lock where the thread dependency on the lock owner is known? Or is the primitive being used here something like a semaphore or a condition variable where it is not know who or what thread or work will signal it?

1 Like

These two methods are such thin wrappers over Task that I don't understand why they exist at all. Why not just use Task directly instead of these methods?

You're correct, at the end they seem almost pointless s the real action was here:

Thanks again for all your comments. To update you all on what I actually did: I created a semaphore based analogue of CheckedContinuation, then replicated withCheckedThrowingContinuation. This allowed me to remove the structured concurrency completely while allowing me to switch to it in later refactoring.

@available(*, deprecated, message: "use withCheckedThrowingContinuation")
public func withThrowingSemaphore<T>(queue: DispatchQueue = DispatchQueue.global(),
                                       _ body: @escaping (SemaphoreContinuation<T>) -> Void) throws -> T {
    let semaphore = SemaphoreContinuation<T>()
    queue.async {
        body(semaphore)
    }
    return try semaphore.wait()
}

@available(*, deprecated, message: "use withCheckedThrowingContinuation")
public func withThrowingSemaphore(queue: DispatchQueue = DispatchQueue.global(),
                                    _ body: @escaping (SemaphoreContinuation<Void>) -> Void) throws {
    let semaphore = SemaphoreContinuation<Void>()
    queue.async {
        body(semaphore)
    }
    return try semaphore.wait()
}

// Semaphore based analogue of CheckedContinuation
public class SemaphoreContinuation<T> {
    let semaphore = DispatchSemaphore(value: 0)
    private var _result: Result<T,Error>!

    public func resume(returning x: T) {
        _result = .success(x)
        semaphore.signal()
    }
    
    public func resume(throwing x: Error) {
        _result = .failure(x)
        semaphore.signal()
    }
}

extension SemaphoreContinuation where T == Void {
    func resume() {
        resume(returning: ())
    }
}

fileprivate extension SemaphoreContinuation {
    
    func wait() throws -> T {
        semaphore.wait()
        return try _result.get()
    }
}

The only issue I left unsolved was the @escaping body which wasn't, due to the the semaphore, actually escaping.

This is something I've been wondering about as well. I think we're ultimately going to need new locking primitives which operate on the Task level rather than the thread level, and which communicate task dependencies to executors.

I think it makes sense. This is the kind of information that the OS scheduler needs in order to efficiently schedule threads on physical execution units, so if we bring that in to the Swift runtime as executors managing tasks on a thread pool, they will need the same kind of metadata about the work that they manage as the OS scheduler currently has -- including which chunks of work are currently being blocked by which specific other chunks of work.

I also worry that we have already defined Executor, SerialExecutor, and UnownedSerialExecutor as public protocols in the standard library. Can those be extended to incorporate locking primitives/task dependency metadata without breaking ABI? :man_shrugging:

Sorry, let me try to do better.

What do you mean by "another thread causing an async task to block"? That can only happen if your async task and this other thread are using some primitive to synchronize between them.

Yes, exactly. I didn't want to phrase it in terms of some responsibility for or guarantee from the async task that gets blocked, because after all, it is blocked and can't do anything at all. I didn't want to phrase it in terms of some property of the overall system being built (like “it would work if there was only one thread”) because those kinds of properties can be devilishly hard to reason about in concurrent systems.

So really the code you can write really comes down to what kinds of primitives are safe to use in this world and whether they express the dependencies clearly (as mentioned in the talk):

Sure, I watched the whole talk and gratefully took in every word! But it's hard to imagine that's the whole story, since although you say some primitives can be used safely, you also advise caution in their use. I'm trying to figure out what the exact cautions are. Surely it comes down to what you do with the primitives? The simplest example I can think of that is likely problematic: a thread could acquire a lock and never release it. While I can imagine cases of plain multithreading where that is sometimes acceptable (if inefficient), it seems likely to be unacceptable for a fixed-sized cooperative thread pool. Another example: one thread could lock a mutex, then pass the lock to another thread (by move) to be unlocked. While that's not an error in a plain multithreaded system, I'm guessing it undermines the dependency information that the system assumes to be represented by a lock. Lastly, since a lock can be implemented in terms of a binary semaphore, it doesn't seem to be an intrinsic property of the semaphore that makes it problematic.

Another question: is failure to express dependency information potentially manifested in deadlock, or only in temporary priority inversion? The latter might be an acceptable risk for some applications, while the former is almost never acceptable.

Thanks,
Dave

Surely it comes down to what you do with the primitives? The simplest example I can think of that is likely problematic: a thread could acquire a lock and never release it. While I can imagine cases of plain multithreading where that is sometimes acceptable (if inefficient), it seems likely to be unacceptable for a fixed-sized cooperative thread pool.

That would be something that is risky in any thread pool. Thread pools are not infinite - most of them have some kind of limit in the end. It might be some really high number like 1024, they may keep going until the kernel tells you there is no more memory to create more threads. So if you have a thread which never releases a lock and all other work items eventually need that lock, you will eventually still run into a deadlock of some kind because those work items can't make progress.

The cooperative pool simply brings the thread pool limit to be closer to NCPUs.

Another example: one thread could lock a mutex, then pass the lock to another thread (by move) to be unlocked. While that's not an error in a plain multithreaded system, I'm guessing it undermines the dependency information that the system assumes to be represented by a lock.

pthread mutexes very unfortunately allow this but if you do this, the behaviour is actually undefined - regardless of whether it is used in async code or not. What is the critical region if you have a thread that locks and another thread that unlocks it? It's extremely fragile and unclear what kind of synchronization you are expecting here and what your protected region is.

That is why os_unfair_locks actually explicitly enforce this by crashing your process if the thread that unlocks the lock is not the same one which took the lock.

Edit: I just noticed the very important point you made here - 'by move'. The locks that exist today don't support this, and as mentioned, os_unfair_locks explicit enforce thread locality. If we have the ability to do this in the future, we'd have to rethink our lock APIs. The caution was made with what we currently provide and support today.

Lastly, since a lock can be implemented in terms of a binary semaphore, it doesn't seem to be an intrinsic property of the semaphore that makes it problematic.

Yes you're right but the only difference here, is that you are using a single bit which is flipped on and off instead of having say, a pthread_t worth of information to have additional bookkeeping on who the locking thread is. This does mean that the primitive will allow you to unlock a mutex on a different thread than the one which locked it but then you are falling into the same problem as earlier mentioned, of having undefined behaviour.

But it's hard to imagine that's the whole story, since although you say some primitives can be used safely, you also advise caution in their use. I'm trying to figure out what the exact cautions are

Regarding locks, the caution is as follows: Locks intrinsically rely on thread locality - you need to unlock the lock on the same thread which took the lock. You can't hold a lock across an await because there is no guarantee that the same thread will pick up the continuation.

Using thread local storage is another example of something that is not safe in Swift concurrency since you don't know which threads will pick up your task and the various partial tasks as it suspends and resumes.

The cases where you need caution with some primitives are really more about correctness in your state machine as opposed to "you are likely going to get into an unrecoverable state". Now if you use these primitives unsafely - like never releasing a lock - that is bad behaviour anywhere, regardless of whether that happens in async code or not.

See also SE-0340: Unavailable From Async Attribute which will provide annotations that API providers can use to warn against using such unsafe primitives in async code, and provide safer alternatives.

Another question: is failure to express dependency information potentially manifested in deadlock, or only in temporary priority inversion? The latter might be an acceptable risk for some applications, while the former is almost never acceptable.

Both.

Blocking a thread on a primitive can be safe if you can guarantee that the task which will unblock that primitive and your thread, has already run or is concurrently running. This is something you cannot guarantee with a semaphore but you can with a lock because you will only block on the lock when you know someone else who will unblock you is already running code in their critical region.

The likelihood of deadlock when using a primitive without clear dependency information is higher in a thread pool with a small limit, compared to one with a much higher limit. In a thread pool with a higher limit, the thread pool will keep giving you threads until one of those threads runs your semaphore-signaling task runs, or the thread pool limit is hit. Most thread pools have a high enough limit that you will likely get away with not hitting the limit and subsequent deadlock. But this comes at the cost of thread explosion, inefficiencies and lots of contention.

With Swift concurrency, because we have the asynchronous waiting semantics of await, the choice made was to take advantage of that to build a more efficient cooperative thread pool instead.

I also encourage you to think about the risk of priority inversion in say, a constrained device like the Apple Watch where you could easily see a "small" priority inversion result in multi-second user visible hangs.

3 Likes

We do have this dependency information today for task level dependencies and we don't need new locking primitives for that - you simply await on a task. Structured concurrency already provides this dependency information.

Today, the scheduler doesn't really perform any kind of DAG analysis to determine which piece of work to schedule first vs which to schedule next. That would be probably a bit too inefficient to factor into scheduling decisions.

That being said, the information is available if we wanted to use it in some form in the future.

2 Likes

Dumb question, but am I safe using DispatchSemaphore or should I switch to NSLock? I tried to work it but can decipher it from documentation.