Unable to fix Deadlock when using combine

I was testing a library I created for thread-safety, and I ran into this really frustrating issue with a deadlock. After a lot of work, I've isolated the code that is causing the deadlock into the following snippet below. Is is the minimum reproducible example, not the full version.

The testDeadlock() function is the entry point. It runs the refreshTokensConcurrently function 10,000 times. During one of the invocations of this function, the deadlock will occur. It could be on the first invocation or the 400th invocation; the behavior is unpredictable.

The deadlock occurs at the line print(authorizationManager), which is within the sink for authorizationManager.didChange

Here are the last few lines of output in the console before the deadlock:

access token IS expired
i: 13; j: 18
authorizationManager.didChange sink: receive value: (())
i: 12; j: 14
WILL print authorizationManager
AuthorizationManager.description: BEFORE queue

For some reason, the call to dispatchQueue.sync in AuthorizationManager.description is blocking indefinitely.

Here's another really strange finding: if I remove the following print statement in AuthorizationManager.refreshTokens():

print("access token IS expired")

Then the deadlock does NOT occur.

What's going on here? What am I doing wrong?

import Foundation
import Combine

class AuthorizationManager {
    
    public var expirationDate: Date
    
    let dispatchQueue = DispatchQueue(label: "AuthorizationManager")
    
    let didChange = PassthroughSubject<Void, Never>()
    
    init() {
        self.expirationDate = Date()
    }
    
    func refreshTokens() -> AnyPublisher<Void, Error> {
        
        return dispatchQueue.sync {
            
            print("access token IS expired")
            
            return Result<Void, Error>
                .Publisher(())
                // this is meant to simulate a network request
                .delay(for: 0.1, scheduler: DispatchQueue.global())
                .map {
                    self.dispatchQueue.sync {
                        self.expirationDate = Date().addingTimeInterval(100_000)
                    }
                    self.didChange.send()
                }
                .eraseToAnyPublisher()
            
        }
        
    }
       
}

extension AuthorizationManager: CustomStringConvertible {
    
    var description: String {
        dispatchPrecondition(
            condition: .notOnQueue(dispatchQueue)
        )
        print("AuthorizationManager.description: BEFORE queue")
        return dispatchQueue.sync {
            print("AuthorizationManager.description: INSIDE queue")
            let dateString = self.expirationDate.description(with: .current)
            return """
                AuthorizationManager(
                    expirationDate: "\(dateString)"
                )
                """
        }
    }

}


let authorizationManager = AuthorizationManager()

/// The entry point.
func testDeadlock() {
 
    for i in 0...10_000 {
        print("\n--- \(i) ---\n")
        authorizationManager.expirationDate = Date().addingTimeInterval(-1)
        refreshTokensConcurrently()
    }
    
}
    
func refreshTokensConcurrently() {

    var cancellables: Set<AnyCancellable> = []
    
    authorizationManager.didChange
        .print("authorizationManager.didChange sink")
        .sink(receiveValue: {
            dispatchPrecondition(
                condition: .notOnQueue(authorizationManager.dispatchQueue)
            )
            print("WILL print authorizationManager")
            // MARK: - Deadlock Occurs Here -
            print(authorizationManager)
            print("DID print authorizationManager")
        })
        .store(in: &cancellables)
        
    let group = DispatchGroup()
    
    
    let internalQueue = DispatchQueue(
        label: "testDeadlock: internal queue"
    )
    
    let concurrentQueue = DispatchQueue(
        label: "testDeadlock concurrent queue"
    )
    concurrentQueue.sync {
        DispatchQueue.concurrentPerform(iterations: 20) { i in
            for j in 0..<20 {
                print("i: \(i); j: \(j)")
                group.enter()
                dispatchPrecondition(
                    condition: .notOnQueue(authorizationManager.dispatchQueue)
                )
                let cancellable = authorizationManager
                    .refreshTokens()
                    .sink { _ in
                        group.leave()
                    }
                
                internalQueue.async {
                    cancellables.insert(cancellable)
                }
                    
            }
        }
    }
    
    print("waiting for tokens to be refreshed")
    group.wait()
    print("finished waiting")

}

Let me help simplify the code. I managed to remove Combine out of the equation. This seems to be a problem with Dispatch.

import Dispatch

class AuthorizationManager: CustomStringConvertible {
    let dispatchQueue = DispatchQueue(label: "AuthorizationManager")

    var description: String {
        dispatchQueue.sync { "Desc" }
    }

    func refreshTokens() {
        dispatchQueue.sync {
            print("Refreshing")
            DispatchQueue.global().async {
                print(self)
            }
        }
    }
}

let authorizationManager = AuthorizationManager()

for _ in 0..<40 {
    authorizationManager.refreshTokens()
}

print("Finished")

Now here's the interesting part. If I replace print(self) with print(dispatchQueue.sync { "desc" }) (same code), the deadlock disappear. :scream:


For now, what I figured out is that you need to specifically print in dispatchQueue inside refreshTokens and use the same dispatchQueue inside description. Also, the queue arrangement needs to be in this very specific order.

So I think what you can do now, is to specify the downstream queue for didChange.

A couple of ideas:

  1. Does the same issue occur if you replace your simulated network request with a real network request, perhaps by making a large number of connections to a web server on localhost?
  2. Does the issue occur if the first line in refreshTokens() uses dispatchQueue.async instead of sync?

I think what you might be running into is the large number of concurrent tasks running on DispatchQueue.global(). The global queues are use unbounded "overcommit" behaviour to spin up new threads when the existing worker threads are busy. This can result in a live-lock situation where the process hangs up completely after spawning too many threads.

I just tried (with my code above). Same.

From (my code above) as well, it happens with as few as 40 threads.

Thank you for taking the time to look over my code, Adam.

Yes it does.

I can't test that because I need to use the return value for dispatchQueue.sync as the return value for the refreshTokens() method. This isn't possible for dispatchQueue.async because it cannot return any values back to the caller due to the fact that it executes asynchronously.

Could synchronize the didChange downstream, to main specifically?

authorizationManager.didChange
  .receive(on: DispatchQueue.main) /// Like this
  .print("authorizationManager.didChange sink")
  ...

That might be my best option. I tested it out in the full version of the code and it works. Edit: Actually, this doesn't work.

Another strange finding is that replacing

print(authorizationManager)

with

someString = "\(authorizationManager)"

also prevents the deadlock. This may have something to do with the fact that the print function is synchronized.

I'm not sure I fully understand what you're saying here. Could you please elaborate?

Also, there's one more thing I should've mentioned beforehand: In the full version of the code, AuthorizationManager.dispatchQueue is private.

I tried to keep removing components until the deadlock disappear (after all, serializing everything won't deadlock). So I found that for a dead lock to happen, you need to repeatedly and concurrently do the following:

  1. You run a block in dispatchQueue,
  2. The block prints anything,
  3. The block runs print(X) on a separated concurrent queue, where
    • X is CustomStringConvertible that uses dispatchQueue inside description.

#3 is extremely peculiar. As you've mentioned, it needs to be exactly print(X), any other variations will make the deadlock disappear, including

print(X.description)
print("\(X)")
_ = "\(X)"

let desc = "\(X)"
print(desc)

Making me thing there might be some problem with the compiler.

PS

In all seriousness, I planned to suggest that you use print("\(X)") just to confuse you even further :smiling_imp:

Actually it turns out that receiving on DispatchQueue.main does not prevent deadlocks. Furthermore, I tested out other locking mechanisms, including NSLock, NSRecursiveLock and even this fancy implementation from Apple, and I experienced the exact same issues.

1 Like

Should I submit a bug report? @Lantua When I tried your simplified version and paused execution when the deadlock occurred, there were only 2 threads running! This makes me very skeptical of the notion that it is an issue with too many threads running.

I also find it interesting that you were able to reproduce this issue even without using DispatchQueue.concurrentPerform.

Please do. AFAICT, that reproducer should be a valid, non-deadlock code. It seems to consistently get stuck anywhere between 2 and ~30 work items, so I put 40 there for good measure. For GCD, 40 async work items are essentially nothing.

Even the main things, I wasn't sure if it remove the deadlock (apparently it doesn't) or just shift the timing to make it unlikely.

Ok, I filed a bug: https://bugs.swift.org/browse/SR-13628

Terms of Service

Privacy Policy

Cookie Policy