[Review requested] AsyncLock

This is probably at least tangentially related to Unsafe `withTaskGroup` optimization?, but I'm outside my comfort zone here so I'd like to ask for some feedback before filing another bug.

Let's assume that my goal is to create an async unfair lock. (cue dramatic pause...). Now that the Boos have mostly been replaced by mumbling about lack of forward progress, I don't necessarily advocate for programming in this style, but I do feel that actors being re-entrant is surprising to a lot of developers.

import Atomics
import Foundation

final class AsyncUnfairLock<Cast: ContinuationCaster>: Sendable {
    /* invariants:
     if 0, unlocked
     if 1, locked
     if >1, the continuation that is at the top of the stack */
    let lock = ManagedAtomic<Int>(0)
    func callAsFunction<T>(_ block: @Sendable () async throws -> T) async rethrows -> T {
        var old = 0
        /* When locking, the current task atomically:
         1. pushes itself to the top of the stack
         2. retains the previous top of the stack in a local variable

         Then, if the previous top of the stack is empty and the lock is free, it immediately
         resumes the task at the top of the stack (which is either itself, or some other task that
         has replaced it since) */
        await withCheckedContinuation { (new: Continuation) in
            // old = swap(new)
            old = lock.exchange(Cast.toInt(new), ordering: .relaxed)
            if old == 0 {
                // swap(1).resume()
                Cast.toContinuation(lock.exchange(1, ordering: .relaxed)).resume()
            }
        }

        /* Before unlocking, if the current task has replaced another, it resumes it (granting it
         the lock instead).

         Otherwise, it atomically either:
         1. unlocks (if no other task has expressed interest) OR
         2. resumes the top of the stack, granting it the lock instead */
        defer {
            if old > 1 {
                // old.resume()
                Cast.toContinuation(old).resume()
            }
            // cmpxchg(1 -> 0)
            else if !lock.compareExchange(expected: 1, desired: 0, ordering: .relaxed).exchanged {
                // swap(1).resume()
                Cast.toContinuation(lock.exchange(1, ordering: .relaxed)).resume()
            }
        }
        return try await block()
    }
}

The above code requires some way to turn a continuation into an atomic value. Since they are structs and MemoryLayout<Continuation>.stride == 8, it should be possible to cast the bits (from testing it seems that they are always aligned to 16 bytes which means that there's even tag bits to work with for more advanced cases). In practice, the Checked flavor seems to do some value-type reference counting magic under the hood and will loudly complain when it's stored as an Int, so for the sake of asserting that the atomic code is at least somewhat correct, we append all continuations to a resizable buffer and return the index using a mutex instead (offset by some base address 1234 because 0 and 1 are special cased values):

typealias Continuation = CheckedContinuation<Void, Never>
struct UnsafeCast: ContinuationCaster {
    static func toInt(_ val: Continuation) -> Int { unsafeBitCast(val, to: Int.self) }
    static func toContinuation(_ val: Int) -> Continuation { unsafeBitCast(val, to: Continuation.self) }
}

protocol ContinuationCaster: Sendable {
    static func toInt(_ val: Continuation) -> Int
    static func toContinuation(_ val: Int) -> Continuation
}

class SafeCast: ContinuationCaster {
    static let mut = Mutex()
    static func toInt(_ val: Continuation) -> Int {
        mut {
            let ptr = buffer.count
            buffer.append(val)
            return ptr + 1234
        }
    }
    static var buffer = [Continuation]()
    static func toContinuation(_ val: Int) -> Continuation { mut { buffer[val - 1234] } }
}

class Mutex: @unchecked Sendable {
    private var val: UnsafeMutablePointer<pthread_mutex_t>
    init() {
        val = .allocate(capacity: 1)
        pthread_mutex_init(val, nil)
    }
    deinit {
        pthread_mutex_destroy(val)
        val.deallocate()
    }
    func callAsFunction<T>(_ block: () throws -> T) rethrows -> T {
        pthread_mutex_lock(val)
        defer { pthread_mutex_unlock(val) }
        return try block()
    }
}

This concludes the review portion. I'm not convinced that my lock is correct, nor that it's the fastest way to implement it in these circumstances. At one swap per lock under high contention and an extra cas + swap for uncontested unlock, it should scale reasonably well if correct.


And herein lies the problem and possible bug. Using the following test code:

class Test: @unchecked Sendable {
    let limit = 1_000_000
    let lock = AsyncUnfairLock<SafeCast>()
    let atomic = ManagedAtomic<Int>(0)

    var value = 0 // unsafe but should be safe with a lock
    func bench() async {
        await withTaskGroup(of: Void.self) { group in
            for _ in 0..<limit {
                group.addTask {
                    await self.lock {
                        self.value += 1
                    }

                    // sanity check: all tasks have returned
                    if self.atomic.wrappingIncrementThenLoad(ordering: .sequentiallyConsistent) == self.limit {
                        print("all tasks done \(self.value)")
                    }
                }
            }
        }
    }
}
let res = Test()
await res.bench()
print("task group done", res.value, res.limit)

I get some rather weird results, depending on -O, continuation flavor and bitcast method:

  1. debug, safecast, checked: deadlock
  2. debug, safecast, unchecked: deadlock
  3. debug, unsafecast, checked: exit 11 + continuation misuse
  4. debug, unsafecast, unchecked: task group done 1000000 1000000 (e.g. the presumably correct version)
  5. release, safecast, checked: USUALLY deadlock but sometimes exit code 4 (while I can't confirm this because it happens rarely, I did encounter a crash in debug pointing to Swift/ContiguousArrayBuffer.swift:600: Fatal error: Index out of range which may have been this error) EDIT: @jrose probably caught this in his comment
  6. release, safecast, unchecked: deadlock
  7. release, unsafecast, checked: exit 11 + continuation misuse
  8. release, unsafecast, unchecked: deadlock

In all scenarios (except, as described above, unsafecast + checked), I got all tasks done 1000000 printed to stdout (using .relaxed, .acquiring, .acquiringAndReleasing and .sequentiallyConsistent which suggests that:

  1. the continuation invariants are preserved
  2. the async lock prevents mutual access, async returns and, runs more sync code afterwards but:
  3. at least one task fails to notify its return, or the task group fails to acknowledge all returns

Reducing the number of concurrent invocations to let limit = 10_000 seems to work in most if not all cases.

1 Like

Without commenting on the rest of it, your toContinuation needs to lock as well, or it will fail when the buffer of continuations is reallocated.

(But also you can’t store continuations in an ever-growing array without running out of memory.)

P.S. Thread Sanitizer is your friend!

2 Likes

Yeah, the fundamental problem is difficult, hence my use of stacks instead of FIFOs

Okay, so it turns out that this isn't a deadlock as much as a busy little beaver:

After the first 2 or so seconds, all threads went to sleep except for thread 2, which was the one running withTaskGroup. I've noticed that memory usage was slowly ticking down (at a rate of about 1 megabyte per minute), so I paused it a few times to see what it was doing and it was always stopping at

libswift_Concurrency.dylib`swift::TaskGroup::removeChildTask:
0x7ffc077c9aef <+31>: cmpq   %rsi, %rcx
0x7ffc077c9af2 <+34>: jne    0x7ffc077c9ae0            ; <+16>

So I let it chug along while I looked at TaskGroup.cpp and it did in fact finish after around 45 minutes:

all tasks done 1000000

task group done 1000000 1000000
Program ended with exit code: 0

Long story short is that this grabs a lock on the group, then calls this which in turn calls this which is a... oh boy, linked list.

What's happening here is that my lock (while probably correct) exhibits pathological behavior when mixed with taskGroup:

Each task starts up by replacing the previous task from the top of the stack and, because the runtime can spawn tasks faster than it can dispose of them, after the first few stacks you end up with a big chungus linked list which is evicted LIFO.

On average, it takes around half a mil cache misses to evict a task and I can confirm this experimentally: memory downtick sped up towards the end basically by a factor of n^2.

Since I know what's causing this, I'll come up with a simpler test case and file it as a performance improvement. After some arbitrary number of active tasks (say 1000) taskGroup should probably switch to a hash table instead.

2 Likes

Yeah; we have quite some room for optimizations in the group. Thanks for filing the related issue.

1 Like