Using semaphores in actor code

I am playing with concurrent programming in Swift to better understand it.

I am using semaphores in actor code, but this is causing problems.

To be specific, I have a process pipeline - a sequence of processes where each process communicates with the one after it through a channel.

I have three processes (Source, Filter, and Sink) connected via two channels (Channel1 and Channel2) like this:

(Source) --- [Channel1] --- (Filter) --- [Channel2] --- (Sink)

The Source process generates signals and passes them to the Filter process, which in turn passes them on to the Sink process.

Because two processes share a channel, I have set up my channels as actors using queues for message transport.

actor Channel {
    private let queue : Queue
    private let capacity : Int
    
    typealias Semaphore = DispatchSemaphore
    var valueSemaphore      : Semaphore
    var congestionSemaphore : Semaphore
    ...
}

 class Queue
     private var store:[T]
     ...
}

Full Details

Channel.swift
//
//  Channel.swift
//

import Foundation

// ----------------------------------------------------------
//
actor Channel <T:Signal> {
    typealias Value = T
    private let queue = Queue <Value> ()
    private let capacity : Int
    
    typealias Semaphore = DispatchSemaphore
    var valueSemaphore      : Semaphore
    var congestionSemaphore : Semaphore

    init (capacity: Int) {
        self.capacity = capacity
        self.valueSemaphore      = Semaphore (value: 0)
        self.congestionSemaphore = Semaphore (value: capacity)
    }
    
    func send (_ value: Value) async {
        congestionSemaphore.wait()
        await queue.enqueue (value)
        valueSemaphore.signal()
    }
    
    func receive () async -> Value {
        valueSemaphore.wait()
        let value = await queue.deueue ()
        congestionSemaphore.signal ()
        return value
    }
}

// ----------------------------------------------------------
//
extension Channel {
    class Queue <T> {
        private var store : [T] = []
        
        func enqueue (_ value: T) async {
            store.append (value)
        }
        
        func deueue () async -> T {
            store.removeFirst()
        }
    }
}

Process.swift
//
//  Process.swift
//

import Foundation

protocol Process {
    func run () async
    func print (diag v: String)
}

extension Process {
    func print (diag v: String) {
        Swift.print ("\(type (of:self)): \(v)")
    }
}

// ----------------------------------------------------------
//
class SourceProcess <T: Signal> : Process {
    typealias Value = Channel <T>.Value
    private let output : Channel <Value>
    private let signalGenerator: () -> Value
    
    init (output: Channel <Value>, signalGenerator: @escaping () -> Value) {
        self.output = output
        self.signalGenerator = signalGenerator
    }
    
    func run () async {
        let signal = signalGenerator ()
        print (diag: "send \(signal)...")
        await output.send (signal)
        print (diag: "done")
    }
}

// ----------------------------------------------------------
//
class SinkProcess <T:Signal> : Process {
    typealias Value = Channel <T>.Value
    private let input : Channel <Value>
    
    init (input: Channel <Value>) {
        self.input = input
    }
    
    func run () async {
        print (diag: "receive...")
        let signal = await input.receive ()
        print (diag: "received \(signal)")
    }
}

// ----------------------------------------------------------
//
class FilterProcess <T:Signal> : Process {
    typealias Value = Channel <T>.Value
    private let input  : Channel <Value>
    private let output : Channel <Value>

    init (input: Channel <Value>, output: Channel <Value>) {
        self.input  = input
        self.output = output
    }
    
    func run () async {
        print (diag: "receive...")
        let signal = await input.receive ()
        print (diag: "received \(signal)")
        
        // all-pass filter...
        
        print (diag: "send...")
        await output.send (signal)
        print (diag: "done")
    }
}

But this does not work. After emitting a few diagnostic messages, the program hangs, as if waiting for something; it does not consume any cpu time at all (according to the Activity Monitor on macOS.)

I suspect that this might have something to do with the fact that I am using semaphores in the channel actor. But why would this cause the program to hang?

If, however, I make the channel a class and its queue an actor, then everything works fine.

class Channel {
    private let queue : Queue
    private let capacity : Int
    
    typealias Semaphore = DispatchSemaphore
    var valueSemaphore      : Semaphore
    var congestionSemaphore : Semaphore
    ...
}

 actor Queue
     private var store : [T]
     ...
}

Here is my working code in full if you would like to see it.

Driver
//
//  Driver.swift
//

import Foundation

@main
struct Driver {
    static func main () async throws {
        let pipeline = ProcessPipeline()
        try? await pipeline.run ()
    }
}
ProcessPipeline
//
//  ProcessPipeline.swift
//

import Foundation

struct ProcessPipeline {
    typealias MemberProcess = (Process, TaskPriority)
    let processes : [MemberProcess]
    
    init () {
        // Channels
        typealias Value = TestSignal
        let channel1 = Channel <Value> (capacity: 5)
        let channel2 = Channel <Value> (capacity: 1)
        
        // Processes
        // (Source) --- [Channel1] --- (Filter) --- [Channel2] --- (Sink)
        let source   = SourceProcess <Value> (output:channel1, signalGenerator: TestSignal.generateSignal)
        let filter   = FilterProcess <Value> (input:channel1, output:channel2)
        let sink     = SinkProcess   <Value> (input:channel2)
        
        self.processes = [(source, .low), (filter, .medium), (sink, .high)]
    }
}

extension ProcessPipeline {
    func run () async throws {
        // helper
        let hibernate = {() async -> Void in
            let dt = UInt64 (Int.random (in: 0..<5) * 1_000_000_000)
            try? await Task.sleep (nanoseconds: dt)
            return
        }

        // helper
        let startTask = { (process: Process, priority:TaskPriority) in
            Task (priority: priority) {
                while true {
                    await process.run ()
                    #if false
                    await Task.yield()
                    #else
                    await hibernate ()
                    #endif
                }
            }
        }
        
        // run each process in its own task
        let tasks = processes.map {
            startTask ($0, $1)
        }
    
        // wait here to keep the processes alive
        for task in tasks {
            _ = await task.result
        }
        print (#function, "done")
    }
}
Channel
//
//  Channel.swift
//

import Foundation

// ----------------------------------------------------------
//
class Channel <T:Signal> {
    typealias Value = T
    private let queue = Queue <Value> ()
    private let capacity : Int
    
    typealias Semaphore = DispatchSemaphore
    var valueSemaphore      : Semaphore
    var congestionSemaphore : Semaphore

    init (capacity: Int) {
        self.capacity = capacity
        self.valueSemaphore      = Semaphore (value: 0)
        self.congestionSemaphore = Semaphore (value: capacity)
    }
    
    func send (_ value: Value) async {
        congestionSemaphore.wait()
        await queue.enqueue (value)
        valueSemaphore.signal()
    }
    
    func receive () async -> Value {
        valueSemaphore.wait()
        let value = await queue.deueue ()
        congestionSemaphore.signal ()
        return value
    }
}

// ----------------------------------------------------------
//
extension Channel {
    actor Queue <T> {
        private var store : [T] = []
        
        func enqueue (_ value: T) async {
            store.append (value)
        }
        
        func deueue () async -> T {
            store.removeFirst()
        }
    }
}
Process
//
//  Process.swift
//

import Foundation

protocol Process {
    func run () async
    func print (diag v: String)
}

extension Process {
    func print (diag v: String) {
        Swift.print ("\(type (of:self)): \(v)")
    }
}

// ----------------------------------------------------------
//
class SourceProcess <T: Signal> : Process {
    typealias Value = Channel <T>.Value
    private let output : Channel <Value>
    private let signalGenerator: () -> Value
    
    init (output: Channel <Value>, signalGenerator: @escaping () -> Value) {
        self.output = output
        self.signalGenerator = signalGenerator
    }
    
    func run () async {
        let signal = signalGenerator ()
        print (diag: "send \(signal)...")
        await output.send (signal)
        print (diag: "done")
    }
}

// ----------------------------------------------------------
//
class SinkProcess <T:Signal> : Process {
    typealias Value = Channel <T>.Value
    private let input : Channel <Value>
    
    init (input: Channel <Value>) {
        self.input = input
    }
    
    func run () async {
        print (diag: "receive...")
        let signal = await input.receive ()
        print (diag: "received \(signal)")
    }
}

// ----------------------------------------------------------
//
class FilterProcess <T:Signal> : Process {
    typealias Value = Channel <T>.Value
    private let input  : Channel <Value>
    private let output : Channel <Value>

    init (input: Channel <Value>, output: Channel <Value>) {
        self.input  = input
        self.output = output
    }
    
    func run () async {
        print (diag: "receive...")
        let signal = await input.receive ()
        print (diag: "received \(signal)")
        
        // all-pass filter...
        
        print (diag: "send...")
        await output.send (signal)
        print (diag: "done")
    }
}
Signal
//
//  Signal.swift
//

import Foundation


protocol Signal : Sendable {
    static func generateSignal () -> Self
}

final class TestSignal: Signal {
    typealias Store = [Int]
    let value : Store
    
    init (value: Store) {
        self.value = value
    }
    
    static func generateSignal () -> TestSignal {
        var value = Store ()
        for _ in 1...8 {
            let digit = Int.random (in: 0...1)
            value.append (digit)
        }
        return TestSignal (value:value)
    }
}

extension TestSignal: CustomStringConvertible {
    var description: String {
        return value.map {String ($0)}.joined()
    }
}

As I have said earlier, if you make the Channel an actor and the Queue a class, then the program does nothing; it does not consume any cpu time at all (according to the Activity Monitor on macOS.)

I would like to know why?

Thank you.

I don’t have time to dig into your specific situation right now but, in general, blocking the thread running your Swift async function is a bad idea. Swift async functions are run by a small pool of threads managed by the Swift concurrency runtime. If you block those threads in a semaphore, you can easily deadlock the runtime.

For more background on this, watch WWDC 2021 Session 10254 Swift concurrency: Behind the scenes

Share and Enjoy

Quinn “The Eskimo!” @ DTS @ Apple

5 Likes

The following custom semaphore does the work nicely. My channel actor no longer deadlocks! :joy:

actor Semaphore {
    private var count: Int
    private var waiters: [CheckedContinuation<Void, Never>] = []

    init(count: Int = 0) {
        self.count = count
    }

    func wait() async {
        count -= 1
        if count >= 0 { return }
        await withCheckedContinuation {
            waiters.append($0)
        }        
    }

    func signal (count: Int = 1) {
        assert(count >= 1)
        self.count += count
        for _ in 0..<count {
            if waiters.isEmpty { return }
            waiters.removeFirst().resume()
        }
    }
}

[The original code is by @MarSe32m]

2 Likes

Your Semaphore actor is very well done. Thanks.

@gwendal.roue has provided essentially the canonical Swift Concurrency semaphore, with the Semaphore package. I strongly suggest just using that - no need to roll your own.

3 Likes

I don't know if it's canonical (I'm eagerly waiting for the stdlib one!), but it has two nice features: support for cancellation, and the ability to use defer for signalling:

{
  try await semaphore.waitUnlessCancelled()
  defer { semaphore.signal() }

  // do stuff, await as many times as needed
}
1 Like

Traditional semaphores are a bad pattern. While these actor-based semaphores don't have the defect of potential thread starvation, they still have the fundamental problem that there's absolutely no way to implement the API that doesn't lead to priority inversion. Swift is definitely missing some sort of low-level abstraction here for gating work on a condition, but I don't want to ever add an API that looks like this. Waiters need to know what tasks are currently responsible for unblocking them.

3 Likes

We're all curious! The main use case of semaphores, in the current state of the language and its documentation, is to address the "reentrancy problem", aka making sure that a sequence of suspension points cannot overlap. Maybe some people would prefer "do not" over "cannot", but "cannot" is stronger and quite reassuring, so it remains my favorite way to say it.

I can't stress enough the problem of documentation. We often hear about wrong patterns (duly noted), but I don't know where to link to when someone (or myself) has a concurrency puzzle to solve and is looking for the correct pattern. A few years after the introduction of Swift concurrency, the fact that developers are still struggling with it can't be the sole responsibility of said developers.

18 Likes

Is there a better existent alternative, to this Semaphore? For the purposes of mutual exclusion (i.e. locking), notifications, and throttling, at least?

Compatible with Swift Concurrency, I mean. There are obviously many standard types available in the "synchronous" world. I have looked, but have found very few that are async-compatible.

2 Likes

I completely agree. A Swift concurrency migration guide for Swift.org is under development. @mattie and I have gathered a list of important concepts, common code patterns, common pitfalls, and common data race safety warnings/errors with strategies for resolving them, and we're working on fleshing out the content. @angela-laar is also putting together a guide for strategically enabling the various Swift 6 upcoming features in a manner that will make the migration to Swift 6 smoother.

I hope to at least put up a draft PR against swift-org-website within the next couple weeks, even if some of the sections haven't been fully fleshed out yet, so that other folks can easily contribute and provide feedback.

27 Likes

That's superb news!

1 Like

This sentence is far-reaching, because if I get it right, it will enforce the coupling of components that were previously considered as completely independent, both in code, and in mind.

I'll defend the semaphore a little bit, because not everybody is subject to priority inversion problems:

  1. A semaphore guarantees exclusivity regardless of the concurrency structure [1] around it, and is robust against the refactoring of said structure.
  2. A semaphore allows clients of a shared resource to ignore each other (seemingly in direct contradiction with the above sentence). In other words: a semaphore allows a shared resource to encapsulate its exclusive accesses without cooperation from the clients.
  3. When a problem of exclusive access is discovered in an app, slamming a semaphore at the correct place fixes the problem quickly without completely refactoring the concurrency structure and breaking the isolation between the clients of the shared resource, which might be a very large piece of work.
  4. When all tasks of a user have the same priority, there is no priority inversion problem.

I'm wondering if any of those points (robustness against refactoring, encapsulation, developer productivity, non-universality of priority inversion) have been balanced against the ideal goals of Swift concurrency. Don't get me wrong: I'm not fighting against the ideals, and I'm all willing to make my practices evolve: I just want to exhibit a few other strong forces that the ideals have to compete with in developers' mind and work environment.

On a positive note, I suppose Task.currentPriority could help improve my Semaphore to make it emit a runtime warning when I detect a priority inversion, just as LibDispatch does.

I also have a negative note: knowing how little interest the Swift designers have in semaphores in general does not make me want to work on it, improve its FIFO with a Deque, explore new and efficient locks, maybe atomics, etc. Too much work has been dismissed as irrelevant, unworthy of interest, if not downright harmful (yes, I sometimes feel like pouting). Meanwhile, there are currently ~300 weekly unique cloners of groue/Semaphore, according to Github: there's a demand for exclusivity.


  1. structure as in "structured concurrency" within tasks, i.e. async sequence loops, task groups, etc. ↩︎

6 Likes

Isn't the same true about locks?

3 Likes

Yes, because a lock acquired by a low-priority job will block a high-priority job. Now, locks usually protect a very short piece of synchronous code. This makes locks a less nasty beast than async semaphores, even if you can write programs that use locks in a pathological way. See also this answer about the relationship between locks and Swift concurrency: [Pitch] Synchronous Mutual Exclusion Lock - #15 by Alejandro

Finally, developers still need to synchronize threads. The linked pitch states:

The main issue is that there isn't a single standardized implementation for this synchronization primitive resulting in everyone needing to roll their own.

3 Likes

I don't think that's correct. We know which thread is going to unlock the lock you're waiting for: it's the same thread that acquired the lock and is currently holding it.

I'm not sure of my understanding of "priority inversion" ;-) I usually think of it as "a high-priority thread HT is waiting for a low-priority thead LT, without possibility for the system to raise the priority of LT due to its lack of knowledge of the dependency." Maybe a system-provided lock can deal with it. But then couldn't a system-provided semaphore deal with this, as well? Now this goes beyond my understanding.

2 Likes

I guess that's the difference between a lock and a semaphore: the system knows which thread is going to unlock a lock but it can't know which thread is going to signal a semaphore.

4 Likes

If we focus on binary semaphores (0 or 1 user of the resource, vs. counting semaphores that allow up to N users), couldn't the concurrency runtime track whatever task has last acquired the semaphore and raise its priority when needed? In this thread we're not blocking threads, instead we're suspending tasks. Doesn't it give more flexibility?

2 Likes

"priority inversion" is a bad thing that happens when high priority thread (HPT) waits on a lock (or semaphore?) that low priority thread (LPT) currently holds, and LPT continuing its business as usual at its own pace (at low priority).

"priority inversion avoidance" is any means to avoid it. One of the typical means (I believe we have this in OS'es for more than a decade now) is to boost LPT priority to match HPT priority in this situation (and of course restore the priority afterwards).

I am not sure how to detect that "priority inversion avoidance" actually happens. NaĂŻvely calling Thread.current.threadPriority or pthread_getschedparam doesn't reflect the change of priority (that is supposed to happen).

Even with a binary semaphore, it's perfectly legal for a thread to call wait() and then exit, and some time later some unrelated thread calls signal(). If another thread calls wait() in between these two events, the runtime would see that the thread which last called wait() no longer exists and has no way to know which thread will call signal(). It would have to elevate every thread's priority to match the waiting thread's one.

This isn't just some weird hypothetical edge case; it's traditionally been the primary reason to use a semaphore rather than a mutex.

3 Likes