I’m encountering a deadlock with the following code (Swift 6.2.3), which is using a single Mutex, and feels like it should work since the acquires/releases are well-ordered.
import Synchronization
// Synchronous access to adding/removing continuations is a must here,
// so using a Mutex.
let continuations = Mutex(Set<AsyncStream<Void>.Continuation>())
func sendToAll(_ index: Int) {
// Print statements just to see where it's getting deadlocked
print("Will acquire mutex for send \(index)")
continuations.withLock {
print("Has acquired mutex for send \(index)")
for continuation in $0 {
continuation.yield()
}
print("Will release mutex for send \(index)")
}
}
func makeTask(for index: Int) -> Task<Void, Never> {
let stream = AsyncStream { continuation in
continuation.onTermination = { _ in
continuations.withLock {
_ = $0.remove(continuation)
}
}
continuations.withLock {
_ = $0.insert(continuation)
}
}
return Task {
for await _ in stream {}
}
}
let count = 100
let tasks = (0..<count).map(makeTask(for:))
await withTaskGroup { group in
for index in 0..<count {
group.addTask {
tasks[index].cancel()
}
group.addTask {
sendToAll(index)
}
}
}
print("END") // Never called
I want to add/remove AsyncStream continuations synchronously from any thread, so I’m using a mutex to do so. Cancelling and yielding values to said continuations concurrently is resulting in a state where continuation.yield() is blocking the thread permanently, resulting in a deadlock. The output, with additional formatting (nondeterministic) is:
┌─ Will acquire mutex for cancel 0
│ Will acquire mutex for send 0
│ Will acquire mutex for send 1
│ Will acquire mutex for cancel 1
│┌ Has acquired mutex for cancel 0
││ Will acquire mutex for send 2
│└ Will release mutex for cancel 0
└─ Has released mutex for cancel 0
Will acquire mutex for cancel 2
Will acquire mutex for cancel 3
!→ Has acquired mutex for send 0
Will acquire mutex for send 3
Will acquire mutex for cancel 4
Will acquire mutex for send 4
Will acquire mutex for cancel 5
Will acquire mutex for send 5
Will acquire mutex for cancel 6
Will acquire mutex for send 6
Will acquire mutex for cancel 7
Will acquire mutex for send 7
Will acquire mutex for cancel 8
I.e., Has acquired mutex for send 0 is always blocking after having acquired the mutex without ever releasing it, causing every other thread to lock up waiting on it.
From what I can see while debugging, it seems like the issue is an inconsistent order while acquiring the Mutex here and acquiring the internal lock that controls a task’s state:
Thread A
- Call
task.cancel()- (Internal) Acquire the task’s status record lock (source)
- (Internal) Call task cancellation handlers (source)
- (Internal) Call
AsyncStream._Storage'swithTaskCancellationHandlerhandler (source)- (Internal) Call
AsyncStream.Continuation.onTermination(source)
- This happens before clearing out the continuation state’s
continuationsarray infinish()(source), so there is still a single continuation waiting to be resumed since we’re iterating over the stream- Block while we wait to acquire the
continuationsMutexinsideonTermination, which at this point is acquired by Thread B
Thread B
- Call
sendToAll(_:)- Acquire the
continuationsMutex(while Thread A is performing cancel)- Call
AsyncStream.Continuation.yield()on one of the continuations whose task is in the process of being cancelled- (Internal) Call into
AsyncStream._Storage.yield(_:)(source)- (Internal) Since Thread A has not cleared the set of continuations,
yield(_:)removes the continuation (source)- (Internal) Call
UnsafeContinuation.resume(returning:)after queuing up an element to yield (source)- (Internal) Call into
swift_continuation_resumeImpl→resumeTaskAfterContinuation(source) →flagAsAndEnqueueOnExecutor(source) →updateStatusRecord(source) →withStatusRecordLock(source)- (Internal) Block while we wait to acquire the task’s status record lock (source), which at this point is acquired by Thread A
I.e., even though the presented code shows only one mutex, the lock controlling the task is at play and is being acquired in a different order on different threads, but there’s no indication that that would be a problem without digging into Swift’s internal code.
What is the expected behavior here? Of course, the implementation can be fixed by delaying acquiring the lock by wrapping continuations.withLock with a Task { … } inside the async stream’s onTermination by adding an artificial suspension point, but should it really by my responsibility of this API to work around that when, from my perspective, there’s no reason it needs be called asynchronously?