Easy to reproduce Combine crash involving Future and concurrency (+ memory leak)

Exploring various techniques to avoid the crash, I've discovered that even if you manage to avoid it (either by luck, or by properly serializing access to the publisher), and let the launcher function with the autorelease pool return, Combine still leaks memory.

I.e. if you return from the perpetualWorker() function after say 25,000 iterations, you've still leaked memory which never gets returned.

I cannot find any solution, even with totally serializing access, that avoids this thing crashing after a while. And it leaks memory badly.

I'm having serious doubts that this framework is at all tested enough to be usable in real-world applications with aggressive mutlithreading.

I ran your code and tinkered with it some. I get the crashing and apparent memory leak too, and I don't understand it. If you find an answer, I'd like to know.

I've figured out how to avoid the crashing. It seems necessary to use a queue to serialize:
(1) subscribing
(2) cancelation
(3) the send() operations

I.e. if I do all of the above within the same serial dispatch queue, no crash. However, if I use a Future it still leaks like crazy. If I use a PassthroughSubject it does not. I can understand that both subscribing and cancelation must be serialized with respect to each other. I suppose also that doing a send() while in the middle of subscribing might be problematic as well.

I've updated my git project (GitHub - davidbaraff/FutureCrash). It no longer crashes, but it leaks badly.

The current version has this line:

let f = pretendToQueryUsingFuture(q)

and although the app won't crash, it will burn through memory quite quickly.
If you change that one line to

let f = pretendToQuery(q)

then it will switch to using a PassthroughSubject, won't crash, and won't burn through memory.

This thread may be useful.

I don't see any problem in your original code that should be causing it to crash. I guess I'll wait a while to see if someone can explain it, before I send it to Apple as a bug report.

Thanks for the report, we'll look into it.

(1) subscribing

I'm pretty sure this isn't thread safe intentionally. Instead, we're supposed to use the subscribe(on:) operator to control the queue that subscribes to values.

(2) cancelation

It does seem like this should always be thread safe.

(3) the send() operations

Do you mean receive? This is supposed to be thread safe as well.

So while many of these are supposed to be thread safe, I'm not sure they're designed to be reentrant (i.e. simultaneous calls on the same instance before the previous call completes).

(3) the send() operations

No I meant send, as in the .send() method of a PassthroughSubject.

Do you mean receive? This is supposed to be thread safe as well.

So while many of these are supposed to be thread safe, I'm not sure they're designed to be reentrant (i.e. simultaneous calls on the same instance before the previous call completes).

Sorry, that sounds like saying it’s not thread safe, as in the cure for making sure not to call while a previous instance has completed is wrapping the operation in a lock/mutex.

Right, I would expect send to be thread safe in that case too. But...

Reentrancy and threadsafety are two different things, with reentrancy often far harder to implement and with a much higher performance cost. For instance, to make send reentrant, not only would you have to lock the internal data structures of the subject, but you'd have to maintain this lock while the subject notifies all of its observers. Obviously this would have a huge impact on the performance of subjects and make them subject to the performance issues of observers themselves. There are ways to mitigate this, of course, like making observations 100% asynchronous, but that has its own costs and tradeoffs.

In general, few libraries are truly reentrant, and I'm pretty sure Rx libraries are designed with particular threading requirements in mind. You could test RxSwift or ReactiveSwift to see how they handle reentrancy. I'm not an expert though, so it would be great if someone with more Rx experience could clarify, or if @Tony_Parker could outline what the requirements of Combine are in this regard.

To be honest, at this point in time, I'm actually far more curious about why my current version of the project on github leaks memory so badly with Future, but not using PassthroughSubject.

Given that Future is (I think?) a class, I assume that the Combine framework has managed to create a retain cycle somewhere that's keeping these Future's all alive, but I sure don't understand why.

1 Like

Xcode's memory debugger may help you see why.

I tried instruments with the leak debugger, but failed to figure it out.

I just created a report with feedback id FB7701628. It's focused on the crash, not the memory leak.

Thanks @robnik. It may come back as a duplicate to you but we're tracking the issue. I really appreciate the reduced test case.

1 Like

Would it be helpful to have a simple test that demonstrates the memory leak, separate from the crash? I’m more concerned about the memory leak with Futures at the moment.

To anyone who makes it this far:
So, it's nearly a year later. On Xcode 12.4, the memory leak is gone. My very top post that doesn't use a DispatchQueue to lock around calling sink, calling cancel, or calling send() still crashes using Future, but not with PassthroughSubject. However, I believe now that it is proper to use DispatchQueue's to serialize these operation as previous posters strongly alluded to. The fact that only Future seems to require it is, likely, dumb luck.

So I think the real issue was the memory leak, and that's fixed.

3 Likes

I had run into the Future deinit crash and was losing my mind until I found this thread, so thanks for having such a detailed back and forth.

The demo code above helped me to set up the crash scenario. Here's an alternate solution for a ReplayOnce publisher that behaves the same as a future but without the crash, backing it with a simple CurrentValueSubject.

public extension Publishers {

    typealias ReplayPromise<Output, Failure: Error> = (Result<Output, Failure>) -> Void

    final class ReplayOnce<Output, Failure: Error>: Publisher {

        private let currentValue = CurrentValueSubject<Output?, Failure>(nil)

        private let outputValue: AnyPublisher<Output, Failure>

        private let dataLock = NSRecursiveLock(name: "\(ReplayOnce.self):data_lock")

        public init(_ handler: @escaping ((@escaping ReplayPromise<Output, Failure>) -> Void)) {

            let current = currentValue
            let lock = dataLock

            let promise: ReplayPromise<Output, Failure> = { result in
                lock.lock()
                defer { lock.unlock() }

                switch result {
                case .success(let value):
                    current.send(value)
                    current.send(completion: .finished)
                case .failure(let error):
                    current.send(completion: .failure(error))
                }
            }

            defer {
                handler(promise)
            }

            self.outputValue = currentValue
                .compactMap { $0 }
                .eraseToAnyPublisher()
        }

        // MARK: Publisher

        public func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
            dataLock.lock()
            defer { dataLock.unlock() }

            if let value = currentValue.value {
                let just = Just(value).setFailureType(to: Failure.self)
                just.receive(subscriber: subscriber)
            } else {
                outputValue.receive(subscriber: subscriber)
            }
        }
    }
}

with this class you can instiate it the same as you would a Future...

func fetch() -> AnyPublisher<Data, SomeError> {
    return Publisher.ReplayOnce { promise in
        queue.async {
           /// some work
           if let data = data {
                promise(.success(data)
           } else {
                promise(.failure(someError))
           }
        }
    }
   .eraseToAnyPublisher()
}

It’s funny how time marches on. Combine, for this sort of thing, is already on its way out (as far as my life is concerned) because async is a much better solution.

Publishers are still super useful in some contexts, but the use of Publishers as an aid to what is now much better done using async has rendered most of this as no longer useful to me.