One can define a semaphore with actors.
In the sample code below, a Semaphore is created with the number of jobs in can run concurrently (one, two, one hundred, just as many as you want).
To run a job, you can call:
// One concurrent job
let semaphore = Semaphore(value: 1)
Task {
// wait for a slot
await semaphore.wait()
// run
await <your async function>
// signal end of job
await semaphore.signal()
}
As a convenience, you can write:
// One concurrent job
let semaphore = Semaphore(value: 1)
Task {
await semaphore.run (
await <your async function>
}
}
See a full sample code below (this a quick draft, I'm not sure this is 100% reliable):
public actor Semaphore {
private var value: Int
private var continuations: [UnsafeContinuation<Void, Never>] = []
init(value: Int) {
self.value = value
}
/// Waits for, or decrements, a semaphore.
///
/// Decrement the counting semaphore. If the resulting value is less than
/// zero, this function waits for a signal to occur before returning.
public func wait() async {
if value <= 0 {
await withUnsafeContinuation { continuation in
continuations.insert(continuation, at: 0)
}
}
assert(value > 0)
value -= 1
}
/// Signals (increments) a semaphore.
///
/// Increment the counting semaphore. If the previous value was less than
/// zero, this function wakes a task currently waiting in ``wait()``.
@discardableResult
public func signal() -> Int {
value += 1
if value > 0, let continuation = continuations.popLast() {
continuation.resume()
}
return value
}
/// Convenience method that waits, run an async function, and signals.
public func run<T>(execute: @Sendable @escaping () async throws -> T) async rethrows -> T {
await wait()
defer { signal() }
return try await execute()
}
}
// Demo
Task {
do {
print("Start, 1 concurrent task")
let semaphore = Semaphore(value: 1)
await withThrowingTaskGroup(of: Void.self) { group in
for i in 0..<10 {
group.addTask {
try await semaphore.run {
print("Start task \(i)")
try await Task.sleep(nanoseconds: 1_000_000_000)
print("End task \(i)")
}
}
}
}
print("Done")
await semaphore.run {
print("One last because it's fun")
}
}
do {
print("Start, 2 concurrent tasks")
let semaphore = Semaphore(value: 2)
await withThrowingTaskGroup(of: Void.self) { group in
for i in 0..<10 {
group.addTask {
try await semaphore.run {
print("Start task \(i)")
try await Task.sleep(nanoseconds: 1_000_000_000)
print("End task \(i)")
}
}
}
}
print("Done")
await semaphore.run {
print("One last because it's fun")
}
}
do {
print("Start, 100 concurrent tasks")
let semaphore = Semaphore(value: 100)
await withThrowingTaskGroup(of: Void.self) { group in
for i in 0..<10 {
group.addTask {
try await semaphore.run {
print("Start task \(i)")
try await Task.sleep(nanoseconds: 1_000_000_000)
print("End task \(i)")
}
}
}
}
print("Done")
await semaphore.run {
print("One last because it's fun")
}
}
}