Thoughts on a ConditionVariable type

Hey folks, in a couple scenarios recently I've found myself needing condition variables and was wondering if it might make sense to provide a ConditionVariable type as part of the standard library, following recent additions of similar low level synchronization primitives like Mutex.

Condition variables normally require a lock, but Mutex isn't useful here because it doesn't expose its underlying lock instance, so we'd need to add new Mutex API (and I haven't thought of a good way to do that which is both safe and doesn't conflate use cases), or introduce a new type. I'm suggesting the latter.

Here's a draft implementation as an example:

public struct ConditionVariable<T: ~Copyable>: ~Copyable {
    #if os(Windows)
    typealias LockType = SRWLOCK
    typealias ConditionVariableType = CONDITION_VARIABLE
    #elseif os(FreeBSD) || os(OpenBSD)
    typealias LockType = pthread_mutex_t?
    typealias ConditionVariableType = pthread_cond_t?
    #else
    typealias LockType = pthread_mutex_t
    typealias ConditionVariableType = pthread_cond_t
    #endif

    private nonisolated(unsafe) var state: T
    private nonisolated(unsafe) let lock = UnsafeMutablePointer<LockType>.allocate(capacity: 1)
    private nonisolated(unsafe) let condition = UnsafeMutablePointer<ConditionVariableType>.allocate(capacity: 1)

    init(_ state: consuming sending T) {
        self.state = state
        #if os(Windows)
        InitializeSRWLock(lock)
        InitializeConditionVariable(condition)
        #else
        pthread_mutex_init(lock, nil)
        pthread_cond_init(condition, nil)
        #endif
    }

    private func _lock() {
        #if os(Windows)
        AcquireSRWLockExclusive(lock)
        #else
        pthread_mutex_lock(lock)
        #endif
    }

    private func _unlock() {
        #if os(Windows)
        ReleaseSRWLockExclusive(lock)
        #else
        pthread_mutex_unlock(lock)
        #endif
    }

    private func _signal() {
        #if os(Windows)
        WakeConditionVariable(condition)
        #else
        pthread_cond_signal(condition)
        #endif
    }

   private func _signalAll() {
        #if os(Windows)
        WakeAllConditionVariable(condition)
        #else
        pthread_cond_broadcast(condition)
        #endif
    }

    private func _wait() {
        #if os(Windows)
        SleepConditionVariableSRW(condition, lock, INFINITE, 0)
        #else
        pthread_cond_wait(condition, lock)
        #endif
    }

    public mutating func signal<R, E>(_ block: (inout sending T) throws(E) -> R) throws(E) -> R {
        _lock()
        defer {
            _unlock()
        }
        defer {
            _signal()
        }
        return try block(&state)
    }

    public mutating func signalAll<R, E>(_ block: (inout sending T) throws(E) -> R) throws(E) -> R {
        _lock()
        defer {
            _unlock()
        }
        defer {
            _signalAll()
        }
        return try block(&state)
    }

    public mutating func wait<R, E>(_ block: (inout sending T) throws(E) -> R) throws(E) -> R {
        try wait(block, when: { _ in true })
    }

    public mutating func wait<R, E>(_ block: (inout sending T) throws(E) -> R, when: (inout sending T) -> Bool) throws(E) -> R {
        _lock()
        defer {
            _unlock()
        }
        defer {
            if when(&state) {
                _wait()
            }
        }
        return try block(&state)
    }
}

The use cases would be primarily for cases where offloading work to dedicated threads outside the Swift Concurrency thread pool is a necessity due to blocking. For example, a serial queue on a dedicated thread.

Simplified example:

fileprivate func SerialQueueMain(_ param: UnsafeMutableRawPointer?) {
    (Unmanaged<AnyObject>.fromOpaque(param!).takeRetainedValue() as! SerialQueue).main()
}

fileprivate final class SerialQueue: Sendable {
    private nonisolated(unsafe) var conditionVariable = ConditionVariable<[@Sendable () -> ()]>([])
    init() {
        let obj = Unmanaged.passRetained(self as AnyObject).toOpaque()
        #if os(Windows)
        CreateThread(nil, 0, { SerialQueueMain($0); return 0 }, obj, 0, nil)
        #else
        #if os(FreeBSD) || os(OpenBSD)
        var thread: pthread_t?
        #else
        var thread = pthread_t()
        #endif
        pthread_create(
            &thread,
            nil,
            { SerialQueueMain($0); return nil },
            obj)
        #endif
    }

    fileprivate func main() {
        while true {
            conditionVariable.wait { workItems in
                if !workItems.isEmpty {
                    workItems.removeFirst()()
                }
            } when: { workItems in
                workItems.isEmpty
            }
        }
    }

    public func run<T: Sendable>(_ work: @escaping @Sendable () -> T) async -> T {
        await withCheckedContinuation { continuation in
            conditionVariable.signal { workItems in
                workItems.append {
                    continuation.resume(returning: work())
                }
            }
        }
    }
}

// Usage
let queue = SerialQueue()
await queue.run { blocking_work() }
await queue.run { blocking_work() }

Another is the waitid loop for monitoring process exit on Unix-like platforms (swift-subprocess's Linux implementation could have benefited from a ConditionVariable type there).

This seems like something that would apply to almost any conceivable platform supported by Swift: the provided implementation should work on at least Windows, Darwin, Linux/Android, FreeBSD, and other POSIX or Unix-like platforms Swift may be ported to in the future. Not sure about WebAssembly, but it seems to have a Mutex implementation.

It looks like FreeBSD's _umtx_op (which is the underlying Mutex lock type) supports condition variables with wait/signal/broadcast operations, so a production implementation could be made to use that instead of pthread. I'm unsure if there are condition variable APIs associated with os_unfair_lock on Darwin or whether we simply need to fall back to pthread APIs there. Linux/futex I have no experience with.

I'm looking for feedback on the initial idea: whether this seems worth a formal proposal, input on naming, annotations, functionality, etc. Thanks!

4 Likes

Interface-wise, you probably don't want any of the public API to be mutating, since mutating operations require exclusive access, and you want a concurrency primitive to be sharable by its nature. You'd want to follow Mutex's implementation example and put the guarded state in something equivalent to the standard library's private _Cell type which can unsafely give mutable access to its contents once you've acquired the lock.

On Apple's UI platforms, traditional condition variables are frowned upon since they have no way to propagate priority/QoS, and it's my understanding that we want to make sure the ultimate "right thing" for the offloading-blocking-work use case can do that effectively. There could be other platforms and use cases where condition variables are nonetheless a useful tool.

4 Likes

Right. I would encourage you to try to describe your use cases in terms of a higher-level primitive in which it’s always clear what a thread is waiting for, rather than the more low-level condvar abstraction which simply gives you the ability to wait.

Priority inversion is not a Darwin-specific concept, though.

2 Likes

Would love to adopt such a thing as (as you know) we are dependent on a condition lock or two in Swift Testing.

That said it'd be nice if we could find a Swiftier way to expose the functionality that a condition lock enables.

Couldn't this be done with an actor and Continuation in structured concurrency?

This is something you'd use in contexts where you can't use structured concurrency (e.g. because you're calling low level OS APIs that are blocking and have no non-blocking alternative).

4 Likes