Hey folks, in a couple scenarios recently I've found myself needing condition variables and was wondering if it might make sense to provide a ConditionVariable
type as part of the standard library, following recent additions of similar low level synchronization primitives like Mutex
.
Condition variables normally require a lock, but Mutex
isn't useful here because it doesn't expose its underlying lock instance, so we'd need to add new Mutex
API (and I haven't thought of a good way to do that which is both safe and doesn't conflate use cases), or introduce a new type. I'm suggesting the latter.
Here's a draft implementation as an example:
public struct ConditionVariable<T: ~Copyable>: ~Copyable {
#if os(Windows)
typealias LockType = SRWLOCK
typealias ConditionVariableType = CONDITION_VARIABLE
#elseif os(FreeBSD) || os(OpenBSD)
typealias LockType = pthread_mutex_t?
typealias ConditionVariableType = pthread_cond_t?
#else
typealias LockType = pthread_mutex_t
typealias ConditionVariableType = pthread_cond_t
#endif
private nonisolated(unsafe) var state: T
private nonisolated(unsafe) let lock = UnsafeMutablePointer<LockType>.allocate(capacity: 1)
private nonisolated(unsafe) let condition = UnsafeMutablePointer<ConditionVariableType>.allocate(capacity: 1)
init(_ state: consuming sending T) {
self.state = state
#if os(Windows)
InitializeSRWLock(lock)
InitializeConditionVariable(condition)
#else
pthread_mutex_init(lock, nil)
pthread_cond_init(condition, nil)
#endif
}
private func _lock() {
#if os(Windows)
AcquireSRWLockExclusive(lock)
#else
pthread_mutex_lock(lock)
#endif
}
private func _unlock() {
#if os(Windows)
ReleaseSRWLockExclusive(lock)
#else
pthread_mutex_unlock(lock)
#endif
}
private func _signal() {
#if os(Windows)
WakeConditionVariable(condition)
#else
pthread_cond_signal(condition)
#endif
}
private func _signalAll() {
#if os(Windows)
WakeAllConditionVariable(condition)
#else
pthread_cond_broadcast(condition)
#endif
}
private func _wait() {
#if os(Windows)
SleepConditionVariableSRW(condition, lock, INFINITE, 0)
#else
pthread_cond_wait(condition, lock)
#endif
}
public mutating func signal<R, E>(_ block: (inout sending T) throws(E) -> R) throws(E) -> R {
_lock()
defer {
_unlock()
}
defer {
_signal()
}
return try block(&state)
}
public mutating func signalAll<R, E>(_ block: (inout sending T) throws(E) -> R) throws(E) -> R {
_lock()
defer {
_unlock()
}
defer {
_signalAll()
}
return try block(&state)
}
public mutating func wait<R, E>(_ block: (inout sending T) throws(E) -> R) throws(E) -> R {
try wait(block, when: { _ in true })
}
public mutating func wait<R, E>(_ block: (inout sending T) throws(E) -> R, when: (inout sending T) -> Bool) throws(E) -> R {
_lock()
defer {
_unlock()
}
defer {
if when(&state) {
_wait()
}
}
return try block(&state)
}
}
The use cases would be primarily for cases where offloading work to dedicated threads outside the Swift Concurrency thread pool is a necessity due to blocking. For example, a serial queue on a dedicated thread.
Simplified example:
fileprivate func SerialQueueMain(_ param: UnsafeMutableRawPointer?) {
(Unmanaged<AnyObject>.fromOpaque(param!).takeRetainedValue() as! SerialQueue).main()
}
fileprivate final class SerialQueue: Sendable {
private nonisolated(unsafe) var conditionVariable = ConditionVariable<[@Sendable () -> ()]>([])
init() {
let obj = Unmanaged.passRetained(self as AnyObject).toOpaque()
#if os(Windows)
CreateThread(nil, 0, { SerialQueueMain($0); return 0 }, obj, 0, nil)
#else
#if os(FreeBSD) || os(OpenBSD)
var thread: pthread_t?
#else
var thread = pthread_t()
#endif
pthread_create(
&thread,
nil,
{ SerialQueueMain($0); return nil },
obj)
#endif
}
fileprivate func main() {
while true {
conditionVariable.wait { workItems in
if !workItems.isEmpty {
workItems.removeFirst()()
}
} when: { workItems in
workItems.isEmpty
}
}
}
public func run<T: Sendable>(_ work: @escaping @Sendable () -> T) async -> T {
await withCheckedContinuation { continuation in
conditionVariable.signal { workItems in
workItems.append {
continuation.resume(returning: work())
}
}
}
}
}
// Usage
let queue = SerialQueue()
await queue.run { blocking_work() }
await queue.run { blocking_work() }
Another is the waitid loop for monitoring process exit on Unix-like platforms (swift-subprocess's Linux implementation could have benefited from a ConditionVariable type there).
This seems like something that would apply to almost any conceivable platform supported by Swift: the provided implementation should work on at least Windows, Darwin, Linux/Android, FreeBSD, and other POSIX or Unix-like platforms Swift may be ported to in the future. Not sure about WebAssembly, but it seems to have a Mutex implementation.
It looks like FreeBSD's _umtx_op (which is the underlying Mutex lock type) supports condition variables with wait/signal/broadcast operations, so a production implementation could be made to use that instead of pthread. I'm unsure if there are condition variable APIs associated with os_unfair_lock on Darwin or whether we simply need to fall back to pthread APIs there. Linux/futex I have no experience with.
I'm looking for feedback on the initial idea: whether this seems worth a formal proposal, input on naming, annotations, functionality, etc. Thanks!