Actor Races

One can define a semaphore with actors.

In the sample code below, a Semaphore is created with the number of jobs in can run concurrently (one, two, one hundred, just as many as you want).

To run a job, you can call:

// One concurrent job
let semaphore = Semaphore(value: 1)

Task {
    // wait for a slot
    await semaphore.wait()
    // run
    await <your async function>
    // signal end of job
    await semaphore.signal()
}

As a convenience, you can write:

// One concurrent job
let semaphore = Semaphore(value: 1)

Task {
    await semaphore.run (
        await <your async function>
    }
}

See a full sample code below (this a quick draft, I'm not sure this is 100% reliable):

public actor Semaphore {
    private var value: Int
    private var continuations: [UnsafeContinuation<Void, Never>] = []

    init(value: Int) {
        self.value = value
    }

    /// Waits for, or decrements, a semaphore.
    ///
    /// Decrement the counting semaphore. If the resulting value is less than
    /// zero, this function waits for a signal to occur before returning.
    public func wait() async {
        if value <= 0 {
            await withUnsafeContinuation { continuation in
                continuations.insert(continuation, at: 0)
            }
        }
        assert(value > 0)
        value -= 1
    }

    /// Signals (increments) a semaphore.
    ///
    /// Increment the counting semaphore. If the previous value was less than
    /// zero, this function wakes a task currently waiting in ``wait()``.
    @discardableResult
    public func signal() -> Int {
        value += 1
        if value > 0, let continuation = continuations.popLast() {
            continuation.resume()
        }
        return value
    }

    /// Convenience method that waits, run an async function, and signals.
    public func run<T>(execute: @Sendable @escaping () async throws -> T) async rethrows -> T {
        await wait()
        defer { signal() }
        return try await execute()
    }
}

// Demo

Task {
    do {
        print("Start, 1 concurrent task")
        let semaphore = Semaphore(value: 1)
        await withThrowingTaskGroup(of: Void.self) { group in
            for i in 0..<10 {
                group.addTask {
                    try await semaphore.run {
                        print("Start task \(i)")
                        try await Task.sleep(nanoseconds: 1_000_000_000)
                        print("End task \(i)")
                    }
                }
            }
        }
        print("Done")
        await semaphore.run {
            print("One last because it's fun")
        }
    }
    
    do {
        print("Start, 2 concurrent tasks")
        let semaphore = Semaphore(value: 2)
        await withThrowingTaskGroup(of: Void.self) { group in
            for i in 0..<10 {
                group.addTask {
                    try await semaphore.run {
                        print("Start task \(i)")
                        try await Task.sleep(nanoseconds: 1_000_000_000)
                        print("End task \(i)")
                    }
                }
            }
        }
        print("Done")
        await semaphore.run {
            print("One last because it's fun")
        }
    }
    
    do {
        print("Start, 100 concurrent tasks")
        let semaphore = Semaphore(value: 100)
        await withThrowingTaskGroup(of: Void.self) { group in
            for i in 0..<10 {
                group.addTask {
                    try await semaphore.run {
                        print("Start task \(i)")
                        try await Task.sleep(nanoseconds: 1_000_000_000)
                        print("End task \(i)")
                    }
                }
            }
        }
        print("Done")
        await semaphore.run {
            print("One last because it's fun")
        }
    }
}
6 Likes