Using semaphores in actor code

I am playing with concurrent programming in Swift to better understand it.

I am using semaphores in actor code, but this is causing problems.

To be specific, I have a process pipeline - a sequence of processes where each process communicates with the one after it through a channel.

I have three processes (Source, Filter, and Sink) connected via two channels (Channel1 and Channel2) like this:

(Source) --- [Channel1] --- (Filter) --- [Channel2] --- (Sink)

The Source process generates signals and passes them to the Filter process, which in turn passes them on to the Sink process.

Because two processes share a channel, I have set up my channels as actors using queues for message transport.

actor Channel {
    private let queue : Queue
    private let capacity : Int
    
    typealias Semaphore = DispatchSemaphore
    var valueSemaphore      : Semaphore
    var congestionSemaphore : Semaphore
    ...
}

 class Queue
     private var store:[T]
     ...
}

Full Details

Channel.swift
//
//  Channel.swift
//

import Foundation

// ----------------------------------------------------------
//
actor Channel <T:Signal> {
    typealias Value = T
    private let queue = Queue <Value> ()
    private let capacity : Int
    
    typealias Semaphore = DispatchSemaphore
    var valueSemaphore      : Semaphore
    var congestionSemaphore : Semaphore

    init (capacity: Int) {
        self.capacity = capacity
        self.valueSemaphore      = Semaphore (value: 0)
        self.congestionSemaphore = Semaphore (value: capacity)
    }
    
    func send (_ value: Value) async {
        congestionSemaphore.wait()
        await queue.enqueue (value)
        valueSemaphore.signal()
    }
    
    func receive () async -> Value {
        valueSemaphore.wait()
        let value = await queue.deueue ()
        congestionSemaphore.signal ()
        return value
    }
}

// ----------------------------------------------------------
//
extension Channel {
    class Queue <T> {
        private var store : [T] = []
        
        func enqueue (_ value: T) async {
            store.append (value)
        }
        
        func deueue () async -> T {
            store.removeFirst()
        }
    }
}

Process.swift
//
//  Process.swift
//

import Foundation

protocol Process {
    func run () async
    func print (diag v: String)
}

extension Process {
    func print (diag v: String) {
        Swift.print ("\(type (of:self)): \(v)")
    }
}

// ----------------------------------------------------------
//
class SourceProcess <T: Signal> : Process {
    typealias Value = Channel <T>.Value
    private let output : Channel <Value>
    private let signalGenerator: () -> Value
    
    init (output: Channel <Value>, signalGenerator: @escaping () -> Value) {
        self.output = output
        self.signalGenerator = signalGenerator
    }
    
    func run () async {
        let signal = signalGenerator ()
        print (diag: "send \(signal)...")
        await output.send (signal)
        print (diag: "done")
    }
}

// ----------------------------------------------------------
//
class SinkProcess <T:Signal> : Process {
    typealias Value = Channel <T>.Value
    private let input : Channel <Value>
    
    init (input: Channel <Value>) {
        self.input = input
    }
    
    func run () async {
        print (diag: "receive...")
        let signal = await input.receive ()
        print (diag: "received \(signal)")
    }
}

// ----------------------------------------------------------
//
class FilterProcess <T:Signal> : Process {
    typealias Value = Channel <T>.Value
    private let input  : Channel <Value>
    private let output : Channel <Value>

    init (input: Channel <Value>, output: Channel <Value>) {
        self.input  = input
        self.output = output
    }
    
    func run () async {
        print (diag: "receive...")
        let signal = await input.receive ()
        print (diag: "received \(signal)")
        
        // all-pass filter...
        
        print (diag: "send...")
        await output.send (signal)
        print (diag: "done")
    }
}

But this does not work. After emitting a few diagnostic messages, the program hangs, as if waiting for something; it does not consume any cpu time at all (according to the Activity Monitor on macOS.)

I suspect that this might have something to do with the fact that I am using semaphores in the channel actor. But why would this cause the program to hang?

If, however, I make the channel a class and its queue an actor, then everything works fine.

class Channel {
    private let queue : Queue
    private let capacity : Int
    
    typealias Semaphore = DispatchSemaphore
    var valueSemaphore      : Semaphore
    var congestionSemaphore : Semaphore
    ...
}

 actor Queue
     private var store : [T]
     ...
}

Here is my working code in full if you would like to see it.

Driver
//
//  Driver.swift
//

import Foundation

@main
struct Driver {
    static func main () async throws {
        let pipeline = ProcessPipeline()
        try? await pipeline.run ()
    }
}
ProcessPipeline
//
//  ProcessPipeline.swift
//

import Foundation

struct ProcessPipeline {
    typealias MemberProcess = (Process, TaskPriority)
    let processes : [MemberProcess]
    
    init () {
        // Channels
        typealias Value = TestSignal
        let channel1 = Channel <Value> (capacity: 5)
        let channel2 = Channel <Value> (capacity: 1)
        
        // Processes
        // (Source) --- [Channel1] --- (Filter) --- [Channel2] --- (Sink)
        let source   = SourceProcess <Value> (output:channel1, signalGenerator: TestSignal.generateSignal)
        let filter   = FilterProcess <Value> (input:channel1, output:channel2)
        let sink     = SinkProcess   <Value> (input:channel2)
        
        self.processes = [(source, .low), (filter, .medium), (sink, .high)]
    }
}

extension ProcessPipeline {
    func run () async throws {
        // helper
        let hibernate = {() async -> Void in
            let dt = UInt64 (Int.random (in: 0..<5) * 1_000_000_000)
            try? await Task.sleep (nanoseconds: dt)
            return
        }

        // helper
        let startTask = { (process: Process, priority:TaskPriority) in
            Task (priority: priority) {
                while true {
                    await process.run ()
                    #if false
                    await Task.yield()
                    #else
                    await hibernate ()
                    #endif
                }
            }
        }
        
        // run each process in its own task
        let tasks = processes.map {
            startTask ($0, $1)
        }
    
        // wait here to keep the processes alive
        for task in tasks {
            _ = await task.result
        }
        print (#function, "done")
    }
}
Channel
//
//  Channel.swift
//

import Foundation

// ----------------------------------------------------------
//
class Channel <T:Signal> {
    typealias Value = T
    private let queue = Queue <Value> ()
    private let capacity : Int
    
    typealias Semaphore = DispatchSemaphore
    var valueSemaphore      : Semaphore
    var congestionSemaphore : Semaphore

    init (capacity: Int) {
        self.capacity = capacity
        self.valueSemaphore      = Semaphore (value: 0)
        self.congestionSemaphore = Semaphore (value: capacity)
    }
    
    func send (_ value: Value) async {
        congestionSemaphore.wait()
        await queue.enqueue (value)
        valueSemaphore.signal()
    }
    
    func receive () async -> Value {
        valueSemaphore.wait()
        let value = await queue.deueue ()
        congestionSemaphore.signal ()
        return value
    }
}

// ----------------------------------------------------------
//
extension Channel {
    actor Queue <T> {
        private var store : [T] = []
        
        func enqueue (_ value: T) async {
            store.append (value)
        }
        
        func deueue () async -> T {
            store.removeFirst()
        }
    }
}
Process
//
//  Process.swift
//

import Foundation

protocol Process {
    func run () async
    func print (diag v: String)
}

extension Process {
    func print (diag v: String) {
        Swift.print ("\(type (of:self)): \(v)")
    }
}

// ----------------------------------------------------------
//
class SourceProcess <T: Signal> : Process {
    typealias Value = Channel <T>.Value
    private let output : Channel <Value>
    private let signalGenerator: () -> Value
    
    init (output: Channel <Value>, signalGenerator: @escaping () -> Value) {
        self.output = output
        self.signalGenerator = signalGenerator
    }
    
    func run () async {
        let signal = signalGenerator ()
        print (diag: "send \(signal)...")
        await output.send (signal)
        print (diag: "done")
    }
}

// ----------------------------------------------------------
//
class SinkProcess <T:Signal> : Process {
    typealias Value = Channel <T>.Value
    private let input : Channel <Value>
    
    init (input: Channel <Value>) {
        self.input = input
    }
    
    func run () async {
        print (diag: "receive...")
        let signal = await input.receive ()
        print (diag: "received \(signal)")
    }
}

// ----------------------------------------------------------
//
class FilterProcess <T:Signal> : Process {
    typealias Value = Channel <T>.Value
    private let input  : Channel <Value>
    private let output : Channel <Value>

    init (input: Channel <Value>, output: Channel <Value>) {
        self.input  = input
        self.output = output
    }
    
    func run () async {
        print (diag: "receive...")
        let signal = await input.receive ()
        print (diag: "received \(signal)")
        
        // all-pass filter...
        
        print (diag: "send...")
        await output.send (signal)
        print (diag: "done")
    }
}
Signal
//
//  Signal.swift
//

import Foundation


protocol Signal : Sendable {
    static func generateSignal () -> Self
}

final class TestSignal: Signal {
    typealias Store = [Int]
    let value : Store
    
    init (value: Store) {
        self.value = value
    }
    
    static func generateSignal () -> TestSignal {
        var value = Store ()
        for _ in 1...8 {
            let digit = Int.random (in: 0...1)
            value.append (digit)
        }
        return TestSignal (value:value)
    }
}

extension TestSignal: CustomStringConvertible {
    var description: String {
        return value.map {String ($0)}.joined()
    }
}

As I have said earlier, if you make the Channel an actor and the Queue a class, then the program does nothing; it does not consume any cpu time at all (according to the Activity Monitor on macOS.)

I would like to know why?

Thank you.

I don’t have time to dig into your specific situation right now but, in general, blocking the thread running your Swift async function is a bad idea. Swift async functions are run by a small pool of threads managed by the Swift concurrency runtime. If you block those threads in a semaphore, you can easily deadlock the runtime.

For more background on this, watch WWDC 2021 Session 10254 Swift concurrency: Behind the scenes

Share and Enjoy

Quinn “The Eskimo!” @ DTS @ Apple

2 Likes