Easy to reproduce Combine crash involving Future and concurrency (+ memory leak)

I found a very easily reproducible crash involving Combine and Future. (And it leaks memory too.) I'm going to send it off as a complete project to Apple, but before I do, does anyone see anything obviously stupid I'm doing? (Xcode: 11.4.1).

(If you'd like a complete Xcode project: GitHub - davidbaraff/FutureCrash).

To explain the motivation, before Combine, we used a separate queue that would do a blocking HTTP call, off the main thread. I wrote new non-blocking API's using Combine, but wanted to keep our blocking API around by just using the new API and blocking on it. That's what the function blockTillCompletion does. Anyway, if anyone could look and tell me if you see something dumb, I'd appreciate it:

extension Publisher {
  public func blockTillCompletion() throws -> Output {
    let semaphore = DispatchSemaphore(value: 0)
    var result: Output?
    var failure: Failure?
    var cancellable: Cancellable? = self.sink(receiveCompletion: { completion in
        switch completion {
        case .failure(let error):
            failure = error
            semaphore.signal()
        case .finished:
            ()
        }
    }) { value in
        result = value
        semaphore.signal()
    }
    _ = semaphore.wait(timeout: .distantFuture)
    cancellable = nil
    
    guard let result = result else { throw failure! }
    return result
  }
}

Here's how to use it and get a crash:

public func pretendToQuery() -> Future<Data, Error>  {
    let future = Future<Data, Error> { promise in
        // Change 0.0001 to 0.001 to decrease the odds of it crashing, and
        // make it easier to watch the memory leak.
        DispatchQueue.main.asyncAfter(deadline: .now() + 0.0001) {
            let result = Data(count: 1024)
            promise(.success(result))
        }
    }
    return future
}

If you simply start several concurrent threads going that do nothing but call pretendToQuery() .blockTillCompletion() repeatedly, after a few seconds (and several thousand calls) you get a crash. If you watch carefully, it leaks memory as well.

Here's code to just launch them in parallel:

private let _workQueue = DispatchQueue(label: "com.deb.work", attributes: .concurrent)
private func perpetualWorker(index: Int) {
    var ctr = 0
    while true {
        autoreleasepool {
            let f = pretendToQuery()
            let result = try! f.blockTillCompletion()
            if result.isEmpty {
                fatalError("Got back empty data")
            }
            ctr += 1
            if ctr % 100 == 0 {
                print("Worker [\(index)]: reached", ctr)
            }
        }
    }
}

private let maxWorkers = 8
public func startWorkers() {
    for i in 0..<8 {
        _workQueue.async {
            perpetualWorker(index: i)
        }
    }
}

It's not unique to Future. I tried transforming it to PassthroughSubject and it also crashes.

Basically, at the point in time where the publisher chain gets torn down, you eventually get a crash.

This framework seems a bit fragile in the face of aggressive multithreading.

I will answer my own questions, in case anyone ever reads this.

You have to serialize calls to sending values with calls to cancelation, i.e. it appears completely unacceptable not to be in total control of which thread you tear down a publisher in.

Exploring various techniques to avoid the crash, I've discovered that even if you manage to avoid it (either by luck, or by properly serializing access to the publisher), and let the launcher function with the autorelease pool return, Combine still leaks memory.

I.e. if you return from the perpetualWorker() function after say 25,000 iterations, you've still leaked memory which never gets returned.

I cannot find any solution, even with totally serializing access, that avoids this thing crashing after a while. And it leaks memory badly.

I'm having serious doubts that this framework is at all tested enough to be usable in real-world applications with aggressive mutlithreading.

I ran your code and tinkered with it some. I get the crashing and apparent memory leak too, and I don't understand it. If you find an answer, I'd like to know.

I've figured out how to avoid the crashing. It seems necessary to use a queue to serialize:
(1) subscribing
(2) cancelation
(3) the send() operations

I.e. if I do all of the above within the same serial dispatch queue, no crash. However, if I use a Future it still leaks like crazy. If I use a PassthroughSubject it does not. I can understand that both subscribing and cancelation must be serialized with respect to each other. I suppose also that doing a send() while in the middle of subscribing might be problematic as well.

I've updated my git project (GitHub - davidbaraff/FutureCrash). It no longer crashes, but it leaks badly.

The current version has this line:

let f = pretendToQueryUsingFuture(q)

and although the app won't crash, it will burn through memory quite quickly.
If you change that one line to

let f = pretendToQuery(q)

then it will switch to using a PassthroughSubject, won't crash, and won't burn through memory.

This thread may be useful.

I don't see any problem in your original code that should be causing it to crash. I guess I'll wait a while to see if someone can explain it, before I send it to Apple as a bug report.

Thanks for the report, we'll look into it.

(1) subscribing

I'm pretty sure this isn't thread safe intentionally. Instead, we're supposed to use the subscribe(on:) operator to control the queue that subscribes to values.

(2) cancelation

It does seem like this should always be thread safe.

(3) the send() operations

Do you mean receive? This is supposed to be thread safe as well.

So while many of these are supposed to be thread safe, I'm not sure they're designed to be reentrant (i.e. simultaneous calls on the same instance before the previous call completes).

(3) the send() operations

No I meant send, as in the .send() method of a PassthroughSubject.

Do you mean receive? This is supposed to be thread safe as well.

So while many of these are supposed to be thread safe, I'm not sure they're designed to be reentrant (i.e. simultaneous calls on the same instance before the previous call completes).

Sorry, that sounds like saying it’s not thread safe, as in the cure for making sure not to call while a previous instance has completed is wrapping the operation in a lock/mutex.

Right, I would expect send to be thread safe in that case too. But...

Reentrancy and threadsafety are two different things, with reentrancy often far harder to implement and with a much higher performance cost. For instance, to make send reentrant, not only would you have to lock the internal data structures of the subject, but you'd have to maintain this lock while the subject notifies all of its observers. Obviously this would have a huge impact on the performance of subjects and make them subject to the performance issues of observers themselves. There are ways to mitigate this, of course, like making observations 100% asynchronous, but that has its own costs and tradeoffs.

In general, few libraries are truly reentrant, and I'm pretty sure Rx libraries are designed with particular threading requirements in mind. You could test RxSwift or ReactiveSwift to see how they handle reentrancy. I'm not an expert though, so it would be great if someone with more Rx experience could clarify, or if @Tony_Parker could outline what the requirements of Combine are in this regard.

To be honest, at this point in time, I'm actually far more curious about why my current version of the project on github leaks memory so badly with Future, but not using PassthroughSubject.

Given that Future is (I think?) a class, I assume that the Combine framework has managed to create a retain cycle somewhere that's keeping these Future's all alive, but I sure don't understand why.

1 Like

Xcode's memory debugger may help you see why.

I tried instruments with the leak debugger, but failed to figure it out.

I just created a report with feedback id FB7701628. It's focused on the crash, not the memory leak.

Thanks @robnik. It may come back as a duplicate to you but we're tracking the issue. I really appreciate the reduced test case.

1 Like

Would it be helpful to have a simple test that demonstrates the memory leak, separate from the crash? I’m more concerned about the memory leak with Futures at the moment.