This is probably at least tangentially related to Unsafe `withTaskGroup` optimization?, but I'm outside my comfort zone here so I'd like to ask for some feedback before filing another bug.
Let's assume that my goal is to create an async unfair lock. (cue dramatic pause...). Now that the Boo
s have mostly been replaced by mumbling about lack of forward progress, I don't necessarily advocate for programming in this style, but I do feel that actors being re-entrant is surprising to a lot of developers.
import Atomics
import Foundation
final class AsyncUnfairLock<Cast: ContinuationCaster>: Sendable {
/* invariants:
if 0, unlocked
if 1, locked
if >1, the continuation that is at the top of the stack */
let lock = ManagedAtomic<Int>(0)
func callAsFunction<T>(_ block: @Sendable () async throws -> T) async rethrows -> T {
var old = 0
/* When locking, the current task atomically:
1. pushes itself to the top of the stack
2. retains the previous top of the stack in a local variable
Then, if the previous top of the stack is empty and the lock is free, it immediately
resumes the task at the top of the stack (which is either itself, or some other task that
has replaced it since) */
await withCheckedContinuation { (new: Continuation) in
// old = swap(new)
old = lock.exchange(Cast.toInt(new), ordering: .relaxed)
if old == 0 {
// swap(1).resume()
Cast.toContinuation(lock.exchange(1, ordering: .relaxed)).resume()
}
}
/* Before unlocking, if the current task has replaced another, it resumes it (granting it
the lock instead).
Otherwise, it atomically either:
1. unlocks (if no other task has expressed interest) OR
2. resumes the top of the stack, granting it the lock instead */
defer {
if old > 1 {
// old.resume()
Cast.toContinuation(old).resume()
}
// cmpxchg(1 -> 0)
else if !lock.compareExchange(expected: 1, desired: 0, ordering: .relaxed).exchanged {
// swap(1).resume()
Cast.toContinuation(lock.exchange(1, ordering: .relaxed)).resume()
}
}
return try await block()
}
}
The above code requires some way to turn a continuation into an atomic value. Since they are structs and MemoryLayout<Continuation>.stride == 8
, it should be possible to cast the bits (from testing it seems that they are always aligned to 16 bytes which means that there's even tag bits to work with for more advanced cases). In practice, the Checked flavor seems to do some value-type reference counting magic under the hood and will loudly complain when it's stored as an Int
, so for the sake of asserting that the atomic code is at least somewhat correct, we append all continuations to a resizable buffer and return the index using a mutex instead (offset by some base address 1234
because 0
and 1
are special cased values):
typealias Continuation = CheckedContinuation<Void, Never>
struct UnsafeCast: ContinuationCaster {
static func toInt(_ val: Continuation) -> Int { unsafeBitCast(val, to: Int.self) }
static func toContinuation(_ val: Int) -> Continuation { unsafeBitCast(val, to: Continuation.self) }
}
protocol ContinuationCaster: Sendable {
static func toInt(_ val: Continuation) -> Int
static func toContinuation(_ val: Int) -> Continuation
}
class SafeCast: ContinuationCaster {
static let mut = Mutex()
static func toInt(_ val: Continuation) -> Int {
mut {
let ptr = buffer.count
buffer.append(val)
return ptr + 1234
}
}
static var buffer = [Continuation]()
static func toContinuation(_ val: Int) -> Continuation { mut { buffer[val - 1234] } }
}
class Mutex: @unchecked Sendable {
private var val: UnsafeMutablePointer<pthread_mutex_t>
init() {
val = .allocate(capacity: 1)
pthread_mutex_init(val, nil)
}
deinit {
pthread_mutex_destroy(val)
val.deallocate()
}
func callAsFunction<T>(_ block: () throws -> T) rethrows -> T {
pthread_mutex_lock(val)
defer { pthread_mutex_unlock(val) }
return try block()
}
}
This concludes the review portion. I'm not convinced that my lock is correct, nor that it's the fastest way to implement it in these circumstances. At one swap per lock under high contention and an extra cas + swap for uncontested unlock, it should scale reasonably well if correct.
And herein lies the problem and possible bug. Using the following test code:
class Test: @unchecked Sendable {
let limit = 1_000_000
let lock = AsyncUnfairLock<SafeCast>()
let atomic = ManagedAtomic<Int>(0)
var value = 0 // unsafe but should be safe with a lock
func bench() async {
await withTaskGroup(of: Void.self) { group in
for _ in 0..<limit {
group.addTask {
await self.lock {
self.value += 1
}
// sanity check: all tasks have returned
if self.atomic.wrappingIncrementThenLoad(ordering: .sequentiallyConsistent) == self.limit {
print("all tasks done \(self.value)")
}
}
}
}
}
}
let res = Test()
await res.bench()
print("task group done", res.value, res.limit)
I get some rather weird results, depending on -O
, continuation flavor and bitcast method:
- debug, safecast, checked: deadlock
- debug, safecast, unchecked: deadlock
- debug, unsafecast, checked: exit 11 + continuation misuse
- debug, unsafecast, unchecked: task group done 1000000 1000000 (e.g. the presumably correct version)
- release, safecast, checked: USUALLY deadlock
but sometimes exit code 4 (while I can't confirm this because it happens rarely, I did encounter a crash in debug pointing toEDIT: @jrose probably caught this in his commentSwift/ContiguousArrayBuffer.swift:600: Fatal error: Index out of range
which may have been this error) - release, safecast, unchecked: deadlock
- release, unsafecast, checked: exit 11 + continuation misuse
- release, unsafecast, unchecked: deadlock
In all scenarios (except, as described above, unsafecast + checked), I got all tasks done 1000000
printed to stdout (using .relaxed
, .acquiring
, .acquiringAndReleasing
and .sequentiallyConsistent
which suggests that:
- the continuation invariants are preserved
- the async lock prevents mutual access, async returns and, runs more sync code afterwards but:
- at least one task fails to notify its return, or the task group fails to acknowledge all returns
Reducing the number of concurrent invocations to let limit = 10_000
seems to work in most if not all cases.