I am playing with concurrent programming in Swift to better understand it.
I am using semaphores in actor code, but this is causing problems.
To be specific, I have a process pipeline - a sequence of processes where each process communicates with the one after it through a channel.
I have three processes (Source, Filter, and Sink) connected via two channels (Channel1 and Channel2) like this:
(Source) --- [Channel1] --- (Filter) --- [Channel2] --- (Sink)
The Source process generates signals and passes them to the Filter process, which in turn passes them on to the Sink process.
Because two processes share a channel, I have set up my channels as actors using queues for message transport.
actor Channel {
private let queue : Queue
private let capacity : Int
typealias Semaphore = DispatchSemaphore
var valueSemaphore : Semaphore
var congestionSemaphore : Semaphore
...
}
class Queue
private var store:[T]
...
}
Full Details
Channel.swift
//
// Channel.swift
//
import Foundation
// ----------------------------------------------------------
//
actor Channel <T:Signal> {
typealias Value = T
private let queue = Queue <Value> ()
private let capacity : Int
typealias Semaphore = DispatchSemaphore
var valueSemaphore : Semaphore
var congestionSemaphore : Semaphore
init (capacity: Int) {
self.capacity = capacity
self.valueSemaphore = Semaphore (value: 0)
self.congestionSemaphore = Semaphore (value: capacity)
}
func send (_ value: Value) async {
congestionSemaphore.wait()
await queue.enqueue (value)
valueSemaphore.signal()
}
func receive () async -> Value {
valueSemaphore.wait()
let value = await queue.deueue ()
congestionSemaphore.signal ()
return value
}
}
// ----------------------------------------------------------
//
extension Channel {
class Queue <T> {
private var store : [T] = []
func enqueue (_ value: T) async {
store.append (value)
}
func deueue () async -> T {
store.removeFirst()
}
}
}
Process.swift
//
// Process.swift
//
import Foundation
protocol Process {
func run () async
func print (diag v: String)
}
extension Process {
func print (diag v: String) {
Swift.print ("\(type (of:self)): \(v)")
}
}
// ----------------------------------------------------------
//
class SourceProcess <T: Signal> : Process {
typealias Value = Channel <T>.Value
private let output : Channel <Value>
private let signalGenerator: () -> Value
init (output: Channel <Value>, signalGenerator: @escaping () -> Value) {
self.output = output
self.signalGenerator = signalGenerator
}
func run () async {
let signal = signalGenerator ()
print (diag: "send \(signal)...")
await output.send (signal)
print (diag: "done")
}
}
// ----------------------------------------------------------
//
class SinkProcess <T:Signal> : Process {
typealias Value = Channel <T>.Value
private let input : Channel <Value>
init (input: Channel <Value>) {
self.input = input
}
func run () async {
print (diag: "receive...")
let signal = await input.receive ()
print (diag: "received \(signal)")
}
}
// ----------------------------------------------------------
//
class FilterProcess <T:Signal> : Process {
typealias Value = Channel <T>.Value
private let input : Channel <Value>
private let output : Channel <Value>
init (input: Channel <Value>, output: Channel <Value>) {
self.input = input
self.output = output
}
func run () async {
print (diag: "receive...")
let signal = await input.receive ()
print (diag: "received \(signal)")
// all-pass filter...
print (diag: "send...")
await output.send (signal)
print (diag: "done")
}
}
But this does not work. After emitting a few diagnostic messages, the program hangs, as if waiting for something; it does not consume any cpu time at all (according to the Activity Monitor on macOS.)
I suspect that this might have something to do with the fact that I am using semaphores in the channel actor. But why would this cause the program to hang?
If, however, I make the channel a class and its queue an actor, then everything works fine.
class Channel {
private let queue : Queue
private let capacity : Int
typealias Semaphore = DispatchSemaphore
var valueSemaphore : Semaphore
var congestionSemaphore : Semaphore
...
}
actor Queue
private var store : [T]
...
}
Here is my working code in full if you would like to see it.
Driver
//
// Driver.swift
//
import Foundation
@main
struct Driver {
static func main () async throws {
let pipeline = ProcessPipeline()
try? await pipeline.run ()
}
}
ProcessPipeline
//
// ProcessPipeline.swift
//
import Foundation
struct ProcessPipeline {
typealias MemberProcess = (Process, TaskPriority)
let processes : [MemberProcess]
init () {
// Channels
typealias Value = TestSignal
let channel1 = Channel <Value> (capacity: 5)
let channel2 = Channel <Value> (capacity: 1)
// Processes
// (Source) --- [Channel1] --- (Filter) --- [Channel2] --- (Sink)
let source = SourceProcess <Value> (output:channel1, signalGenerator: TestSignal.generateSignal)
let filter = FilterProcess <Value> (input:channel1, output:channel2)
let sink = SinkProcess <Value> (input:channel2)
self.processes = [(source, .low), (filter, .medium), (sink, .high)]
}
}
extension ProcessPipeline {
func run () async throws {
// helper
let hibernate = {() async -> Void in
let dt = UInt64 (Int.random (in: 0..<5) * 1_000_000_000)
try? await Task.sleep (nanoseconds: dt)
return
}
// helper
let startTask = { (process: Process, priority:TaskPriority) in
Task (priority: priority) {
while true {
await process.run ()
#if false
await Task.yield()
#else
await hibernate ()
#endif
}
}
}
// run each process in its own task
let tasks = processes.map {
startTask ($0, $1)
}
// wait here to keep the processes alive
for task in tasks {
_ = await task.result
}
print (#function, "done")
}
}
Channel
//
// Channel.swift
//
import Foundation
// ----------------------------------------------------------
//
class Channel <T:Signal> {
typealias Value = T
private let queue = Queue <Value> ()
private let capacity : Int
typealias Semaphore = DispatchSemaphore
var valueSemaphore : Semaphore
var congestionSemaphore : Semaphore
init (capacity: Int) {
self.capacity = capacity
self.valueSemaphore = Semaphore (value: 0)
self.congestionSemaphore = Semaphore (value: capacity)
}
func send (_ value: Value) async {
congestionSemaphore.wait()
await queue.enqueue (value)
valueSemaphore.signal()
}
func receive () async -> Value {
valueSemaphore.wait()
let value = await queue.deueue ()
congestionSemaphore.signal ()
return value
}
}
// ----------------------------------------------------------
//
extension Channel {
actor Queue <T> {
private var store : [T] = []
func enqueue (_ value: T) async {
store.append (value)
}
func deueue () async -> T {
store.removeFirst()
}
}
}
Process
//
// Process.swift
//
import Foundation
protocol Process {
func run () async
func print (diag v: String)
}
extension Process {
func print (diag v: String) {
Swift.print ("\(type (of:self)): \(v)")
}
}
// ----------------------------------------------------------
//
class SourceProcess <T: Signal> : Process {
typealias Value = Channel <T>.Value
private let output : Channel <Value>
private let signalGenerator: () -> Value
init (output: Channel <Value>, signalGenerator: @escaping () -> Value) {
self.output = output
self.signalGenerator = signalGenerator
}
func run () async {
let signal = signalGenerator ()
print (diag: "send \(signal)...")
await output.send (signal)
print (diag: "done")
}
}
// ----------------------------------------------------------
//
class SinkProcess <T:Signal> : Process {
typealias Value = Channel <T>.Value
private let input : Channel <Value>
init (input: Channel <Value>) {
self.input = input
}
func run () async {
print (diag: "receive...")
let signal = await input.receive ()
print (diag: "received \(signal)")
}
}
// ----------------------------------------------------------
//
class FilterProcess <T:Signal> : Process {
typealias Value = Channel <T>.Value
private let input : Channel <Value>
private let output : Channel <Value>
init (input: Channel <Value>, output: Channel <Value>) {
self.input = input
self.output = output
}
func run () async {
print (diag: "receive...")
let signal = await input.receive ()
print (diag: "received \(signal)")
// all-pass filter...
print (diag: "send...")
await output.send (signal)
print (diag: "done")
}
}
Signal
//
// Signal.swift
//
import Foundation
protocol Signal : Sendable {
static func generateSignal () -> Self
}
final class TestSignal: Signal {
typealias Store = [Int]
let value : Store
init (value: Store) {
self.value = value
}
static func generateSignal () -> TestSignal {
var value = Store ()
for _ in 1...8 {
let digit = Int.random (in: 0...1)
value.append (digit)
}
return TestSignal (value:value)
}
}
extension TestSignal: CustomStringConvertible {
var description: String {
return value.map {String ($0)}.joined()
}
}
As I have said earlier, if you make the Channel an actor and the Queue a class, then the program does nothing; it does not consume any cpu time at all (according to the Activity Monitor on macOS.)
I would like to know why?
Thank you.