Deadlock With `AsyncStream`s and a Single `Mutex`

I’m encountering a deadlock with the following code (Swift 6.2.3), which is using a single Mutex, and feels like it should work since the acquires/releases are well-ordered.

import Synchronization

// Synchronous access to adding/removing continuations is a must here,
// so using a Mutex.
let continuations = Mutex(Set<AsyncStream<Void>.Continuation>())

func sendToAll(_ index: Int) {
    // Print statements just to see where it's getting deadlocked
	print("Will acquire mutex for send \(index)")
	continuations.withLock {
		print("Has acquired mutex for send \(index)")
		for continuation in $0 {
			continuation.yield()
		}
		print("Will release mutex for send \(index)")
	}
}

func makeTask(for index: Int) -> Task<Void, Never> {
	let stream = AsyncStream { continuation in
		continuation.onTermination = { _ in
			continuations.withLock {
				_ = $0.remove(continuation)
			}
		}
		
		continuations.withLock {
			_ = $0.insert(continuation)
		}
	}
	return Task {
		for await _ in stream {}
	}
}

let count = 100
let tasks = (0..<count).map(makeTask(for:))

await withTaskGroup { group in
	for index in 0..<count {
		group.addTask {
			tasks[index].cancel()
		}

		group.addTask {
			sendToAll(index)
		}
	}
}

print("END") // Never called

I want to add/remove AsyncStream continuations synchronously from any thread, so I’m using a mutex to do so. Cancelling and yielding values to said continuations concurrently is resulting in a state where continuation.yield() is blocking the thread permanently, resulting in a deadlock. The output, with additional formatting (nondeterministic) is:

┌─ Will acquire mutex for cancel 0
│  Will acquire mutex for send 0
│  Will acquire mutex for send 1
│  Will acquire mutex for cancel 1
│┌ Has acquired mutex for cancel 0
││ Will acquire mutex for send 2
│└ Will release mutex for cancel 0
└─ Has released mutex for cancel 0
   Will acquire mutex for cancel 2
   Will acquire mutex for cancel 3
!→ Has acquired mutex for send 0
   Will acquire mutex for send 3
   Will acquire mutex for cancel 4
   Will acquire mutex for send 4
   Will acquire mutex for cancel 5
   Will acquire mutex for send 5
   Will acquire mutex for cancel 6
   Will acquire mutex for send 6
   Will acquire mutex for cancel 7
   Will acquire mutex for send 7
   Will acquire mutex for cancel 8

I.e., Has acquired mutex for send 0 is always blocking after having acquired the mutex without ever releasing it, causing every other thread to lock up waiting on it.


From what I can see while debugging, it seems like the issue is an inconsistent order while acquiring the Mutex here and acquiring the internal lock that controls a task’s state:

Thread A

  1. Call task.cancel()
  2. (Internal) Acquire the task’s status record lock (source)
  3. (Internal) Call task cancellation handlers (source)
  4. (Internal) Call AsyncStream._Storage's withTaskCancellationHandler handler (source)
  5. (Internal) Call AsyncStream.Continuation.onTermination (source)
    1. This happens before clearing out the continuation state’s continuations array in finish() (source), so there is still a single continuation waiting to be resumed since we’re iterating over the stream
  6. Block while we wait to acquire the continuations Mutex inside onTermination, which at this point is acquired by Thread B

Thread B

  1. Call sendToAll(_:)
  2. Acquire the continuations Mutex (while Thread A is performing cancel)
  3. Call AsyncStream.Continuation.yield() on one of the continuations whose task is in the process of being cancelled
  4. (Internal) Call into AsyncStream._Storage.yield(_:) (source)
  5. (Internal) Since Thread A has not cleared the set of continuations, yield(_:) removes the continuation (source)
  6. (Internal) Call UnsafeContinuation.resume(returning:) after queuing up an element to yield (source)
  7. (Internal) Call into swift_continuation_resumeImplresumeTaskAfterContinuation (source) → flagAsAndEnqueueOnExecutor (source) → updateStatusRecord (source) → withStatusRecordLock (source)
  8. (Internal) Block while we wait to acquire the task’s status record lock (source), which at this point is acquired by Thread A

I.e., even though the presented code shows only one mutex, the lock controlling the task is at play and is being acquired in a different order on different threads, but there’s no indication that that would be a problem without digging into Swift’s internal code.


What is the expected behavior here? Of course, the implementation can be fixed by delaying acquiring the lock by wrapping continuations.withLock with a Task { … } inside the async stream’s onTermination by adding an artificial suspension point, but should it really by my responsibility of this API to work around that when, from my perspective, there’s no reason it needs be called asynchronously?

3 Likes

you perhaps know this, but in general calling out to client code while holding a mutex can be risky as it invites this sort of problem. if, rather than yielding to the continuations while holding the mutex, you read out the current 'snapshot' of continuations and yielded outside the critical section, i think you might sidestep this particular issue (assuming that is a tolerable tradeoff for your use case of course).

i'll have to think about it more deeply to get a sense of whether the AsyncStream implementation may be exacerbating things here or if this is an unfortunate burden to be carried by clients. as to whether or not it is expected... there is this section of the task cancellation handler docs that suggests it is a known situation to be aware of:

Cancellation handlers which acquire locks must take care to avoid deadlock. The cancellation handler may be invoked while holding internal locks associated with the task or other tasks. Other operations on the task, such as resuming a continuation, may acquire these same internal locks. Therefore, if a cancellation handler must acquire a lock, other code should not cancel tasks or resume continuations while holding that lock.

granted, you're not directly calling that API when using AsyncStream, but as you discovered onTermination is invoked synchronously when the iterating Task is cancelled.

5 Likes

@jamieQ is 100% on the mark here: The continuation's yield method should be considered a client call-out that may contain arbitrary code; so locking around that is highly discouraged.

2 Likes

Looking into the AsyncStream code, I think this particular issue at least might benefit from re-ordering the process of clearing out the continuations and calling the onTermination handler. Once cancel() is called (and has successfully acquired the lock), any remaining continuations should functionally be dead at that point, not after waiting for onTermination to return. We’d still have to ensure we yield nil to those continuations after calling onTermination, but there’s no reason to keep them around in the state and allow another yield() to access them.

If instead AsyncStream._Storage clears out its continuations at the same time it clears out its onTermination handler, a concurrent call to yield() won’t attempt to acquire the task’s lock to resume said continuation, and only after the onTermination has returned will the continuations resume with a nil value.

Agreed, but it’s not obvious (not mentioned anywhere in the docs) that this is a dangerous thing to do the way withTaskCancellationHandler has. Same for onTermination (that one is at least more obvious that it has something to do with task cancellation and could inherit the same semantics as withTaskCancellationHandler, but again it’s not clear and feels like a foot gun).

2 Likes

In addition to the needed documentation, I'd like to ask, what's the correct way to structure the example? Normally you want to capture your work out from the lock, like returning a copy of the array, but I don't know if that's actually correct in case.

i think that sort of change would be reasonable and would probably make the code slightly easier to follow. it would also make stream termination more 'atomic' in the sense that it would only need to acquire its internal lock once. however, i've not convinced myself it would actually resolve this class of problem.

suppose we made that change – when the stream is cancelled it clears both the onTermination handler and and pending continuations atomically. it seems to me that the following sequence of events could occur (assume threads 'A' and 'B'):

  1. [A]: acquires continuations mutex in sendToAll()
  2. [A]: calls yield on the stream's continuation
  3. [A]: stream internals run up until just before the call to yield on the unsafe continuation (stream's internal lock is released at this point)
  4. [B]: cancel() invoked
  5. [B]: stream internals acquire its lock and clear out the onTermination handler
  6. [B]: stream internals release its lock and invoke onTermination handler
  7. [B]: handler invocation blocks awaiting acquisition of the continuations mutex

assuming this is possible, then IIUC we'd be in essentially the same state as you outlined above – the thread trying to yield() would be holding the continuations mutex, attempting to yield to a continuation, while the cancelling thread would hold the status record lock and be blocking on acquiring the continuations mutex.

i think this depends on the requirements. as it's implemented now yielding to the set of 'observers' (let's call them) prevents that set's being altered until everything has been notified (or like... queued to be notified. exactly when the async resumption happens is largely an implementation detail). however, it doesn't stop concurrent code from trying to add a new observer into the set whilst delivery is ongoing – it will just block until delivery has completed. the world in which the current observer set is copied out from the mutex first and then notified is similar in that it also would not deliver events to an observer that was added during an ongoing broadcast.

I think you’re right. As long as the following are true, there’s always going to be the chance to deadlock:

  1. AsyncStream.Continuation.yield calls into UnsafeContinuation.resume synchronously.
  2. UnsafeContinuation.resume blocks to a acquire the task status lock.
  3. withTaskCancellationHandler calls into AsyncStream._Storage.cancel() synchronously while holding onto the task lock.
  4. AsyncStream._Storage.cancel() calls the provided onTermination synchronously.

I’m willing to chalk this case of deadlock up to user error; all of these feel like they either have to be true, or changing them at this point would have some other undesirable consequences, and it’s up to me as the consumer of this API to work around these limitations.

But it’s the fact that AsyncStream's internals are opaque to consumers. In this case, AsyncStream.Continuation.onTermination is inheriting the same behavior as withTaskCancellationHandler while AsyncStream.Continuation.yield is inheriting the same behavior as UnsafeContinuation.resume(). The withTaskCancellationHandler docs explicitly call that out as problematic (you linked to it above), but it’s not clear to me as a consumer of AsyncStream that I need to heed the warning in a seemingly unrelated API since it’s not mentioned anywhere in the docs of AsyncStream. If the behavior of AsyncStream, withTaskCancellationHandler, and UnsafeContinuation can’t be changed at this point, having a warning in the docs that onTermination inherits the same warnings as withTaskCancellationHandler would at least leave it a little clearer that there’s potential for deadlock, and how it can be fixed.

1 Like

here's a proposed documentation update (feedback welcome): [NFC][Concurrency]: add note about deadlock risk for AsyncStream.onTermination by jamieQ · Pull Request #87082 · swiftlang/swift · GitHub.


tangential to this particular issue, but when looking into this more, i think those docs could be further improved b/c they don't currently actually cover all the states that can result in that callback being executed (e.g. if the stream 'goes out of scope', etc.). but i left that for another day since writing is hard.

3 Likes