Combine nested publishers leak

Hello Swift Community,

After working some time with the brand new Combine framework I lately hit some issue that really gave me and my team some really hard times to figure out. I haven't found anything related to this on any docs or StackOverflow for that matter so here I am.

The problems started when I started using URLSessions's dataTaskPublisher, which, after several attempts and some XCode updates later, I found out it must be either stored or assigned to a subscriber, otherwise the attached URLRequest is cancelled. Everything was good so far, except for my use case, I needed to return the dataTaskPublisher as the result of some request function. At that time, in order to both be able to return it and keep the dataTaskPublisher alive me and my team decided to wrap the dataTaskPublisher's call inside a Future publisher, which we would erase to any publisher and return. That meant we had to subscribe to the dataTaskPublisher right in the Future's closure using sink, and promise either the completion or the value inside sink's completion and value closure blocks. This approach seemed very reasonable at that time given the fact that the code actually worked.

Nevertheless, after profiling our app, we later discovered thousands of leaks coming from dataTaskPublisher's sink. After some more days spent trying to figure out why is this happening, we noticed the problem was caused by wrapping the dataTaskPublisher call and sink inside the Future publisher. We also found out we could keep the dataTaskPublisher alive (avoid cancelled requests) even so by just returning it and subscribe to it in another place.

I am new to reactive programming so I assume this might be a silly question but still I am so curious to find out why is it wrong to sink to a publisher ( dataTaskPublisher in this case) inside a Future publisher's body. I will attach below some code from our first approach (which caused the leaks).

        return Future<Data, NetworkError> { [weak self] promise in
            guard let self = self else { return }

            self.urlSession.dataTaskPublisher(for: request)
                .mapError {
                    if ($0 as NSError).code == URLError.timedOut.rawValue {
                        return NetworkError.timeout
                    }
                    return NetworkError.urlError($0)
            }
            .flatMap { [weak self] (data, response) -> AnyPublisher<Data, NetworkError> in
                guard let response = response as? HTTPURLResponse else {
                    let error = NetworkError.invalidResponse
                    self?.requestPlugins.forEach {
                        $0.didReceive(.failure(error), on: endpoint)
                    }
                    return .fail(error)
                }

                guard 200..<300 ~= response.statusCode else {
                    let error: NetworkError
                    if response.statusCode == 401 {
                        error = .unauthorized(data: data)
                        self?.unauthorizedPublisher.send(error)
                    } else {
                        error = .serverError(statusCode: response.statusCode, data: data)
                    }
                    self?.requestPlugins.forEach {
                        $0.didReceive(.failure(error), on: endpoint)
                    }
                    return .fail(error)
                }

                self?.requestPlugins.forEach {
                    $0.didReceive(.success((response, data)), on: endpoint)
                }

                return .just(data)
            }
            .sink(receiveCompletion: { completion in
                switch completion {
                case .failure(let error):
                    promise(.failure(error))
                case .finished:
                    break
                }
            }, receiveValue: {
                promise(.success($0))
            })
                .store(in: &self.cancellableSet)
        }.eraseToAnyPublisher()

Thank you.

Not quite sure where the leak comes from, but it would be much easier to track without the outer Future, which is not needed here as far as I understand. You also don't need to create an intermediate subscription with store(in: &self.cancellableSet), which might be the source of the leak in the first place. I don't think your client code would expect a new subscription to be created as soon as you get this publisher as a result. It's usually client's code (that is whatever code that gets the resulting publisher) responsibility to set up subscriptions and manage them as they see fit.

1 Like

Hey, in the meanwhile I also found the source of the leak. It seems like sink's closure is somehow capturing promise from Future. Which makes me think even Apple's example of Future usage would create leaks :

func performAsyncActionAsFuture() -> Future <Void, Never> {
    return Future() { promise in
        DispatchQueue.main.asyncAfter(deadline:.now() + 2) {
            promise(Result.success(()))
        }
    }
}

My other function causing leaks looks something like this (and it causes leaks):

private func updateArea(_ field: AreaUpdateField) -> AnyPublisher<Bool, AreaResponseError> {
    Future<Bool, AreaResponseError> { [weak self] promise in
        self?.persistancePlugin.update(field) { result in 
              promise(result)
        }
    }.eraseToAnyPublisher()
}

So I am very not sure if the problem is in my code or with Combine's Future class but I tend to believe it has to do with Combine.

The Apple's example won't create any leaks as the only strong reference created in the future's closure is to DispatchQueue.main, which is a global singleton, where one wouldn't expect it to be deallocated at any point anyway.

In your second example it very much depends on what owns the resulting future and a subscription to it if any is created. Of course, if self or persistencePlugin own the Future instance (or a subscription to it) that would be problematic, you would have to manually break the cycle at some point.

In general, I agree that Combine introduces enough mental overhead to be constantly aware of, which is exacerbated by the lack of documentation and clear examples. It is still one of the best abstractions available, especially because it's first-party and more developers become acquainted with it, so for now the overhead is probably warranted until something better is discovered.

What helped me in writing more effective Combine code is following a few rules:

  • Make sure you understand the lifecycle of your subscription: when is it created and can it be potentially created multiple times, when do upstream publishers complete and when do you dispose of the subscription.
  • Make sure you don't introduce "Result of call is unused" warnings, especially for function calls that return publishers. This warning most probably means you don't subscribe to your publisher or that your subscription is not retained, so it won't be handled properly.
  • Avoid capturing instances of reference types in any of your operators, if this is unavoidable it's better to capture those references within sink by making [weak self] capture list explicit.
  • It's better to make Future a low-level abstraction and not to expose it on the higher level, especially due the mental overhead required to understand the lifetime of your Future and its completion closures if those capture any reference types. If you need to trigger some other publisher after a Future has completed, chain it with flatMap, avoid creating strong references in the flatMap closure.

Hope this helps :slightly_smiling_face:

2 Likes

Hi Max,

Your answer definitely helped me or at least gave me a hint on what might be the problem. After a deep dig in my persistance plugin I discovered that the call of the closure for the update function I posted above was wrapped in a DispatchQueue.main.async block. After I removed the closure call out of the block, the leaks disappeared. I also considered your advice and remove the nested callbacks which I had in the updateArea function (because another call was made to the persistance plugin inside the result closure for the update function call, which I did not attach to my code snippet above) to instead chain multiple Future publishers with flatMap.

Thank you very much!

1 Like

Hey Vlad,

You've already had some good advice, and it sounds like you're clearing most of the bad bits away - let me try and add a few combine specific quirks so that you're aware of them.

First, avoid nesting publishers, which complicates the closures and can lead to a number of the issues. For most of the cases you're aiming at, you can do the same work in a chain with only a single publisher.

The top-level publisher that you're using to get the effect you want here is Future, which is something of a non-conformist as compared to most of the other publishers. Future has a quirk where the closure for it's asynchronous call is invoked immediately upon creation, where most other publishers don't do much in the way of "heavy lifting" until they receive a subscription. The whole point of this sequence (subscribers "driving the train" as it were) is to support backpressure and the cancellation mechanisms that allow Combine to drop work it doesn't need to - well, until it does - represented by a subscription and the associated demand.

A number of folks who've used Future often stumble over this (including me when I was learning it) and it took me a while to clue in that about the only nested pattern of publishers that I'd want to use are Deferred wrapping a Future to make an async background call. That pattern is immensely useful for wrapping already async APIs and objects, and getting them converted into a combine pipeline. If you're starting with URLSession.dataTaskPublisher, wrapping it in a Future seems inappropriate to the framework's design - you're kind of swimming uphill with that.

A side effect of this immediate-Future thing is operators like retry() (assuming you might want to use such a thing) to their work by manipulating the subscriptions - not the creation of publishers - so a raw Future doesn't work with retry() worth a damn.

If the point of the Future was to make a request and store it for multiple subscribers to leverage, then a Subject or using .share() & .multicast() can help solve that issue for you. Either mechanism (and it may be the same under the covers, I just don't know) will encapsulate and isolate the subscriptions from multiple endpoints into a single request that then gets shared when it's complete. You need to have all the subscribers connected before the request completes for that to be useful, but generally that's pretty doable to arrange.

The pattern for checking errors vs. URLResponse status codes is slightly different from the one you've used above - I like to drop it into a tryMap() and look at the status code, tending to consolidate the errors after that point so that I can attach a flatMap & it's internal pipeline that it uses along with a catch to handle the error conditions in a consistent fashion. Nothing wrong with your code, just thought I'd share an alternate way to tackle the same issue. You can see an example of that pattern in my Using Combine work, under the pattern Stricter request processing with dataTaskPublisher. I'd also point out Matt Neuburg's free work Understanding Combine, where he does a really nice job of the tutorial process.

2 Likes

Hey Joseph,

Thank you for your reply. You've added some useful information over Max's answer above. I also browsed through your book a lot in the past few day trying to figure out the problem for me and understanding the mechanisms of these publishers in greater details. I think is very well done and thinking to read it top to bottom.

Coming back to my issue and the example I attached in my initial post. Indeed the future wrapping the dataTaskPublisher turned out to be useless and I removed it in the meanwhile. The reason I added that Future in the first place (a long time ago) was to keep my dataTaskPublisher's subscription alive, because I noticed all the requests were cancelled if not assigned to an AnyCancellable. After I removed the Future completely and simply returned the dataTaskPublisher as AnyPublisher to be subscribed to at a higher level layer, the requests were still cancelled but that was due to the fact that at the point of subscription I was assigning the sink to an instance of AnyCancellable, when in fact I should have stored it into a set of AnyCancellables (consider I was doing a large number of these requests in a short amount of time, I guess any previous AnyCancellable was being deallocated when it was replaced by another one). Anyways this solved my problem for the URL requests at least.

But there's more to it :sweat_smile:. As my use case also involved a database update as well in parallel to every backend requests, I discovered some leaks coming from that side as well. And so we arrive at the example with the persistancePlugin I gave Max in my second reply. For this case I had to:

  1. Load the corresponding area objects from database
  2. Update it with the changes in areaUpdateField
  3. Persist it back

In this example, the closure which contains the result to the update (and calls promise) is passed all the way down (through 2 more layers of abstraction) and called with the result inside the private ManagedObjectContext's perform closure with the result of the persist (true or false). After I did some investigation I came to the conclusion that the function in which perform was called, was given the completion as an @escaping parameter and was somehow capturing it. A solution to this was to change the closure parameter to be non-escaping and call performAndWait instead of perform. After some more thinking I realized my whole flow is designed wrong. Hence I moved the Future from the update function at the highest level, to the persistance provider where the managed object context actually performs the work. It is there where I now call promise as well. At the highest level of abstraction I now only subscribe to the publisher which is returned and map its result so that my function now look something more like this:

private func updateLocalArea(_ field: AreaUpdateField) -> AnyPublisher<Bool, AreaResponseError> {
    loadArea(with: field.id, field: field)
        .flatMap(persistArea)
        .eraseToAnyPublisher()
}

private func loadArea(with id: String, field: AreaUpdateField) -> AnyPublisher<AreaDTO, AreaResponseError> {
    self.areaPersistencePlugin.loadArea(with: id)
        .map { areaDTO in
            var area = areaDTO
            switch field {
            case .brightness(_, let brightness):
                area.action.brightness = brightness
            case .on(_, let on):
                area.action.on = on
            case .nameAndIcon(_, let name, let icon):
                area.name = name
                area.icon = icon
            }
            
            return area
        }
        .mapError { .updateAreaFailed($0.localizedDescription) }
        .eraseToAnyPublisher()
}

private func persistArea(areaDTO: AreaDTO) -> AnyPublisher<Bool, AreaResponseError> {
    areaPersistencePlugin.persist(areas: [areaDTO])
        .mapError { .updateAreaFailed($0.localizedDescription) }
        .eraseToAnyPublisher()

If you have time to read through all my reply Joseph please correct me if I was wrong on the assumptions I made.

Thank you.

Best regards.

Hey guys @Max_Desiatov @Joseph_Heck.

Even though its been almost a month of inactivity on this topic I came with a little update on it which it was worth mentioning in my opinion. As the project in which I discovered the problems in the first place could not wait for this bug to be fixed, I went ahead and tried to replicate the same problem in a small side project to investigate this problem some more. My intention was to create a small core data provider class which would expose an API to fetch and persist some data. The only difference between this one and the provider used in the main project is using NSPersistentContainer's performBackgroundTask closure to perform the operation instead of NSManagedObjectContext's perform as it is in the main project. Here's my implementation of the fetch:

func load<Entity: CoreDataMappable>(entitiesOfClass entityClass: Entity.Type) -> AnyPublisher<[Entity], Error> {
    Future<[Entity], Error> { [weak self] promise in
        guard let self = self else { return }
        self.persistentContainer.performBackgroundTask { _ in
            do {
                let results = try self.findAll(Entity.CDEntity.self).map { $0.wrapped() }
                promise(.success(results))
            } catch {
                promise(.failure(error))
            }
        }
    }.eraseToAnyPublisher()
}

Had it not be a problem😒, I'd say I successfully managed to reproduce the leaks in my side project. At this point I became really curious and interested to see Apple's Combine implementation of Future. Of course that was not possible but the closest I could get was the OpenCombine library on Github, whose author of its Future publisher implementation is (if I'm not wrong :sweat_smile:) @Max_Desiatov himself.
Now, most likely Apple's implementation of Future is different from OpenCombine's. Nevertheless after I replaced Apple's Future in the above load function with Max's Future, surprise, no more leaks. Nothing else changed besides the implementation of Future and the AnyCancellable subscription for it.

What is your opinion on this? Could this be a bug of Combine or I could still be doing something wrong which makes Combine's Future leak. The specific instance which leaks is Future<,>.Conduit.

Thank you for your support.

2 Likes

This is some great investigation, I'd love to know what's the difference is under the hood and why one implementation leaks, but the other one doesn't. :thinking: Mine was a very naive implementation based on what I previously saw in numerous other futures/promises implementations available on GitHub, most of my previous experience was with BrightFutures though.

1 Like

I too am curious to find out the differences. Worth mentioning is also that after several attempts with Combine's Future (before attempting this with OpenCombine) I tried to create a specialized Publisher for persistance operations, with the idea of the dataTaskPublisher in mind. Here's the code:

import Combine
import CoreData

typealias PersistanceResult<Output, Error: Swift.Error> = Result<Output, Error>

final class PersistancePublisher<Output, ErrorType: Error>: Publisher {
    // MARK: - Properties

typealias Output = Output
typealias Failure = ErrorType

private let peristanceContainer: NSPersistentContainer
private let closure: (NSManagedObjectContext) -> PersistanceResult<Output, ErrorType>

// MARK: - Init

init(persistentContainer: NSPersistentContainer, closure: @escaping ((NSManagedObjectContext) -> PersistanceResult<Output, ErrorType>)) {
    self.peristanceContainer = persistentContainer
    self.closure = closure
}

// MARK: - Publisher

func receive<S>(subscriber: S) where S : Subscriber, S.Failure == Failure, Output == S.Input {
    peristanceContainer.performBackgroundTask { ctx in
        let result = self.closure(ctx)
        switch result {
        case .success(let results):
            let _ = subscriber.receive(results)
            subscriber.receive(completion: .finished)
        case .failure(let error):
            subscriber.receive(completion: .failure(error))
        }
    }
}
extension NSPersistentContainer {
    func persistanceTaskPublisher<R>(closure: @escaping ((NSManagedObjectContext) -> Result<R, Error>)) -> AnyPublisher<R, Error> {
        PersistancePublisher(persistentContainer: self, closure: closure)
            .eraseToAnyPublisher()
    }

}

Now my implementation is even more naive (and btw please point of if I missed something or there's some mistake) but it does not leak curiously and it does the job very well.

1 Like

Looks like good digging, but I'm afraid I don't have any insight into how either of the two work myself. All my work has been empirically testing Apple's combine as a "black box" and documenting the results, inferring what it's doing from there.

That doesn't preclude there being a leak or bug, but I neither could I assert there isn't one. I apologize, I don't have insight that might help here.

1 Like

Your use of [weak self] is of no benefit. Future executes its closure immediately and synchronously (before Future.init returns), so that weak self reference is guaranteed to be non-nil when the closure runs and converts it back to a strong reference. Then when you call self.findAll in the inner (background task) closure, it's still a strong reference.

2 Likes

I'm experiencing dozens of the same Future<,>.Conduit leaks all over my app. According to Xcode's memory debugger nothing is actually referencing them, and I haven't been able to find anything else being leaked or referenced by them, so I've been guessing it must be a Combine bug and just accepted it for now.

@Onne Have you ever thought to issue a bug to Apple about this?

I’ve had only bad experiences filing radars so I’ve pretty much given up on it, sorry for being bitter. I was actually kind of expecting it to be fixed soon because it seems very easy to produce, pretty much any normal use of Future will cause it it seems (but maybe I’m wrong and I’m doing something weird). Also it just hasn’t been a huge priority for me, the memory leak is pretty small.

2 Likes

Yeah, totally understand it. If this is indeed a problem which persists with every normal use case of Future (as you're saying) I am looking forward to it being fixed.