Structured concurrency deadlock help needed

I've been experimenting with the structured concurrency and I seem to have created a deadlock race condition. I can't see what I'm doing wrong but I'm not sure if it is a silly error, a fundamental misunderstanding or if it is a beta 1 bug in macOS 12.0.

The full project can be found here: GitHub - josephlord/AsyncSequnceExtensions at 5e00bd7c83f9f85dc2fb627f663ecaf2970b5a2b

When I run the testAsyncSequencePublisherBasic test on the Mac (M1 Mac Mini) it fails by appearing to deadlock about 20% of the time. I'm running in Xcode and using the "Run "xxx: Repeatedly" option by right clicking on the test. It is working in the iOS Simulator but that may just have been a consistent race winner. In the repo you will see the test timeout is only half a second bigger times make no difference.

The experiment I'm trying to do is to map from an AsyncSequence into a Combine publisher. Below the code snippet are the print outputs of a good and a bad run.


@available(iOS 15.0, macOS 12.0, *)
public struct AsyncSequencePublisher<AsyncSequenceType> : Publisher where AsyncSequenceType : AsyncSequence {
    public typealias Output = AsyncSequenceType.Element
    public typealias Failure = Error
    
    let sequence: AsyncSequenceType
    
    public init(_ sequence: AsyncSequenceType) {
        self.sequence = sequence
    }
    
    fileprivate actor ASPSubscription<S> : Subscription
    where S : Subscriber, S.Failure == Error, S.Input == AsyncSequenceType.Element {
        
        var demand: Subscribers.Demand = .none
        var taskHandle: Task.Handle<(), Never>?

        var demandUpdatedContinuation: CheckedContinuation<Void, Never>?
        
        private func mainLoop(seq: AsyncSequenceType, sub: S) {
            Swift.print("MainLoop")
            taskHandle = detach {
                do {
                    Swift.print("Loop")
                    for try await element in seq {
                        Swift.print("element: \(element)")
                        await self.waitUntilReadyForMore()
                        guard !Task.isCancelled else { return }
                        await self.setDemand(demand: sub.receive(element))
                    }
                    sub.receive(completion: .finished)
                } catch {
                    if error is Task.CancellationError { return }
                    sub.receive(completion: .failure(error))
                }
            }
            Swift.print("taskHandle set")
        }
        
        private func waitUntilReadyForMore() async {
            
            Swift.print("Wait on Demand")
            if demand > 0 {
                demand -= 1
                Swift.print("Demand OK - continue")
                return
            }
            
            let _: Void = await withCheckedContinuation { continuation in
                Swift.print("Set continuation")
                demandUpdatedContinuation = continuation
            }
        }
        
        nonisolated init(sequence: AsyncSequenceType, subscriber: S) {
            async {
                await mainLoop(seq: sequence, sub: subscriber)
            }
            Swift.print("init returned")
        }
        
        nonisolated func request(_ demand: Subscribers.Demand) {
            Swift.print("request: \(demand)")
            detach {
                Swift.print("Will await setDemand")
                await self.setDemand(demand: demand)
                Swift.print("Back from await setDemand")
            }
        }
        
        nonisolated func cancel() {
            detach {
                Swift.print("Cancel")
                await self.setCanceled()
                await self.demandUpdated()// Unblock so the main loop can hit the task cancellation guard
                
            }
        }
        
        private func demandUpdated() {
            demandUpdatedContinuation?.resume()
            demandUpdatedContinuation = nil
        }
        
        private func setCanceled() async {
            taskHandle?.cancel()
        }
        
        private func setDemand(demand: Subscribers.Demand) async {
            self.demand += demand
            guard demand > 0 else { return }
            demandUpdated()
        }
        
        deinit {
            taskHandle?.cancel()
            demandUpdatedContinuation?.resume()
        }
    }
    
    public func receive<S>(subscriber: S) where S : Subscriber, Error == S.Failure, AsyncSequenceType.Element == S.Input {
        let subscription = ASPSubscription(sequence: sequence, subscriber: subscriber)
        subscriber.receive(subscription: subscription)
    }
}

@available(iOS 15.0, macOS 12.0, *)
extension AsyncSequence {
    public var publisher: AsyncSequencePublisher<Self> {
        AsyncSequencePublisher(self)
    }
}

A successful run looks like:

init returned
request: unlimited
MainLoop
taskHandle set
Will await setDemand
Loop
Back from await setDemand
element: 1
Wait on Demand
Demand OK - continue
received 1
finished

The prints from an unsuccessful run look like this:

init returned
request: unlimited
MainLoop
Will await setDemand
Loop
taskHandle set
element: 1

It appears that both detached tasks (in mainLoop and request) get blocked from entering the Actor's controlled state (waitUntilReadyForMore and setDemand respectively) but I can't see what could be holding the actor indefinitely.

If "Wait on Demand" was printed I would suspect that I had misunderstood the effect of withCheckedContinution but I don't think it even gets that far.

At this point this is just experimental / learning code, I don't have an urgent need for an AsyncSequencePublisher so even if there is a built in or simpler equivalent I'd still like to understand what is going wrong here (I am interested in alternative approaches but not instead of an answer on this).

Any help gratefully received.

@Philippe_Hausler may want to have a look at this as it touches async sequences and combine.

The issue here I think had nothing to do with Combine per se but more to do with the continuation and actors. The isolated state of the actor prevents the continuation from making forward progress.

I am uncertain about the rules and context of continuations within actors but I don’t think it even reaches the withCheckedContinuation call because in the unsuccessful run I don’t see the ‘print(“Wait on Demand”)’ happening. So it doesn’t seem to be entering the function containing the continuation.

I think I've made some progress. I don't think the issue is the continuation at all (although I'm not quite sure I have enough understanding about the requirements for using them safely in an actor). The issue seems to be in the ASPSubscription (actor) init:

        nonisolated init(sequence: AsyncSequenceType, subscriber: S) {
            async {
                await self.mainLoop(seq: sequence, sub: subscriber)
                //Swift.print("await mainLoop returned")
            }
            Swift.print("init returned")
        }

If I either uncomment the "await mainLoop returned" print or change this from async to detached it seems to work. I've not yet checked if adding the print just affects the timing of the race or changes the generated code more fundamentally.

The detach is probably a sign I need to read some more, I was expecting in the context of a nonisolated function that async and detach would be essentially equivalent (beyond picking up an priorities if being called from within a task).

It does seem to work when I make the init this (which as 3.isZero should be very fast suggests that the statements after the await are affecting the generated code rather than just tipping the scales in the race:

        nonisolated init(sequence: AsyncSequenceType, subscriber: S) {
            async {
                await self.mainLoop(seq: sequence, sub: subscriber)
                if 3.isZero { Swift.print("await mainLoop returned") }
            }
            Swift.print("init returned")
        }

Whoop, this seems to seep into dangerous territory actually.
I believe such closing over self in synchronous initializers may actually be soon forbidden, right @kavon ?

It might be that shouldn't be allowed but I don't think it is the problem I'm seeing. Everything is already set up before the async call. When I refactor to use a static function instead I still hit the same issue about 14 times in a hundred.

        private init() {}
        
        static func subscription(sequence: AsyncSequenceType, subscriber: S) -> ASPSubscription<S> {
            let subscription = ASPSubscription()
            async {
                await subscription.mainLoop(seq: sequence, sub: subscriber)
            }
            return subscription
        }

Again using detach or having an operation after the await seems to resolve the issue but I'm trying to get an understanding and certainty of whether it is a bug.

1 Like

I've ruled out the continuation being the issue now with a slightly reduced test case (no longer a fully valid publisher) but the continuation has been removed. Still getting similar failure rate of about 14 in 100 on an M1 Mac mini. On iOS Simulators at least I can't reproduce the issue.

There are three changes that seem to fix the issue if I do any of them. One (which I couldn't do in reality while keeping it functional is to avoid the await self.setDemand(...) in the mainLoop. This is a call back to the actor from a detached task.

The second option that seems to resolve it is to use a detached task in the static subscription function.

The third and to me the strangest and most likely to indicate a compiler or OS bug is to add any functionality after the await call to start the main loop. It is almost as if without it the mainLoop doesn't reliably release its hold on the actor.

Apologies for it not being a fully minimal case yet, I tried to build up from nothing to achieve a similar flow but haven't succeeded yet.

@available(iOS 15.0, macOS 12.0, *)
public struct MinimisedDeadlockExample<AsyncSequenceType> : Publisher where AsyncSequenceType : AsyncSequence {
    public typealias Output = AsyncSequenceType.Element
    public typealias Failure = Error
    
    let sequence: AsyncSequenceType
    
    public init(_ sequence: AsyncSequenceType) {
        self.sequence = sequence
    }
    
    fileprivate actor ASPSubscription<S> : Subscription
    where S : Subscriber, S.Failure == Error, S.Input == AsyncSequenceType.Element {
        
        var taskHandle: Task.Handle<(), Never>?
        
        private func mainLoop(seq: AsyncSequenceType, sub: S) {
            Swift.print("MainLoop")
            taskHandle = detach {
                do {
                    Swift.print("Loop")
                    for try await element in seq {
                        Swift.print("element: \(element)")
                        Swift.print("Ready for more")
                        let addedDemand = sub.receive(element)
// If this await self.setDemand is removed it no longer deadlocks
                        await self.setDemand(demand: addedDemand)
                        Swift.print("mainLoop - loop end")
                    }
                    Swift.print("mainLoop - loop completion")
                    sub.receive(completion: .finished)
                } catch {
                    Swift.print("MainLoop - catch \(error.localizedDescription)")
                    if error is Task.CancellationError { return }
                    sub.receive(completion: .failure(error))
                }
            }
            Swift.print("taskHandle set")
        }
        
        
        private init() { }
        
        static func subscription(sequence: AsyncSequenceType, subscriber: S) -> ASPSubscription<S> {
            let subscription = ASPSubscription()
            async { // If this is detach instead of async it no longer deadlocks
                await subscription.mainLoop(seq: sequence, sub: subscriber)
//                if 3.isZero { Swift.print("If this is uncommented it no longer deadlocks") }
            }
            return subscription
        }
        
        nonisolated func request(_ demand: Subscribers.Demand) {
            Swift.print("request: \(demand)")
            detach {
                Swift.print("Will await setDemand")
                await self.setDemand(demand: demand)
                Swift.print("Back from await setDemand")
            }
        }
        
        nonisolated func cancel() {}
        
        private func setDemand(demand: Subscribers.Demand) async { Swift.print("setDemand") }
        
        deinit {
            Swift.print("deinit")
        }
    }
    
    public func receive<S>(subscriber: S) where S : Subscriber, Error == S.Failure, AsyncSequenceType.Element == S.Input {
        let subscription = ASPSubscription.subscription(sequence: sequence, subscriber: subscriber)
        subscriber.receive(subscription: subscription)
    }
}

@available(iOS 15.0, macOS 12.0, *)
extension AsyncSequence {
    public var deadlockPublisher: MinimisedDeadlockExample<Self> {
        MinimisedDeadlockExample(self)
    }
}
Full code and test code from Github

Link to code on Github

Link to test on Github
AsyncSequnceExtensions/MinimisedDeadlockTests.swift at deadlockExamples · josephlord/AsyncSequnceExtensions · GitHub

Wonder if I might have been hitting this issue (noted in Xcode beta 2 release notes)

  • The async Task APIs in macOS, watchOS, and tvOS differ from the async Task APIs in iOS. As a result, Task API calls on Actor objects in multi-platform projects may cause async functions to hang at a suspension point. (79378627)
Terms of Service

Privacy Policy

Cookie Policy