Combine - How to zip a value with a flatMap publisher and get results in a tuple

I have a "queue" defined like so:

private var queue: PassthroughSubject<RequestProtocol, Never> = .init()

And I allow consumers to submit "Requests" to it:

func submit(request: RequestProtocol) {
    queue.send(request)
}

I have a RequestOperation that can publish the response, but it doesn't know anything about how those results should be processed. That information is stored in some context, we call it "RequestProtocol".

So I would like to perform the operation and have the value zipped with the context (RequestProtocol).

I was trying to do so like this:

        let operationPublisher = queue
        .buffer(size: Int.max, prefetch: .byRequest, whenFull: .dropOldest)
        .flatMap(maxPublishers: .max(maxConcurrentOperationCount)) {
            RequestOperation(request: $0.urlRequest).publisher().unfailable()
        }

    queue
        .zip(operationPublisher)
        .sink { (request, result) in
            switch result {
            case .success(let (data, response)):
                // process success
            case .failure(let error):
                // process failure
            }
        }.store(in: &subscriptions)

And that works when maxConcurrentOperationCount is 1. However, when that value is more than one and operations can run concurrently, I end up with tuples that have mixed up context/results. That is because the operations can finish out of order.

I think this is all possible to do with Combine, but I am missing something. One obvious solution would be to move the context to be a property of the operation. But that breaks some internal separation of concerns that we would like to maintain.

Thanks for any help.

It looks like you want to monitor requests and subscribe to their results, so it seems strange to me that you're trying to sink on the queue, which is just a publisher of requests.

I think what you really should be doing is sinking on the operationPublisher. Currently, it's a publisher of what ever the value published by RequestOperation.publisher() is, presumably some result type. So perhaps resultPublisher would be a more appropriate name.

You could change this from a publisher of results to a publisher of (request, result) quite easily in that case:

let resultPublisher = queue
        .buffer(size: Int.max, prefetch: .byRequest, whenFull: .dropOldest)
        .flatMap(maxPublishers: .max(maxConcurrentOperationCount)) { request in
            RequestOperation(request: request.urlRequest)
                .publisher()
                .unfailable()
                .map { (request, $0) }
        }

Here's a more complete example that I knocked up in a Playground:

struct Request {
    let id = UUID()
    let delay: TimeInterval
}

struct RequestOperation {
    typealias Response =  Result<String, Never>
    
    let id: UUID
    let delay: TimeInterval
    
    func publisher() -> AnyPublisher<Response, Never> {
        return Future { promise in
            DispatchQueue.global().asyncAfter(deadline: .now() + delay) {
                promise(.success(.success("it worked")))
            }
        }.eraseToAnyPublisher()
    }
}

let queue = PassthroughSubject<Request, Never>()

let resultPublisher = queue
    .buffer(size: Int.max, prefetch: .byRequest, whenFull: .dropOldest)
    .flatMap(maxPublishers: .max(5)) { request in
        RequestOperation(id: request.id, delay: request.delay)
            .publisher()
            .map { (request, $0) }
    }


let cancellable = resultPublisher.sink { (request, result) in
    if case let .success(value) = result {
        print("Request: \(request.id), result: \(value)")
    }
}

queue.send(Request(delay: 1))
queue.send(Request(delay: 2))
queue.send(Request(delay: 3))
1 Like

@lukeredpath, Thank you for taking the time to understand my question and come up with an answer. Your answer seems to work wonderfully. And you are right, I was thinking about the problem from the wrong angle.

I eventually came up with this very over-complicated operator to help:

internal extension Publisher where Failure == Never {
/// Flatmap a publisher and zips it with the value of the current publisher.
/// If the publisher that you flatMap with doesn't finish the operations in order,
/// this operator takes care of the logic to make sure the correct context (zipped value)
/// comes back with the correct flat mapped value.
/// - Parameters:
///   - maxPublishers: The number of concurrent operations
///   - transform: A closure that takes an element as a parameter and returns a publisher
/// that produces elements of that type.
/// - Returns: A publisher that transforms elements from an upstream publisher into
/// a publisher of that element’s type.
func zipFlatMap<T, P>(maxPublishers: Subscribers.Demand = .unlimited, _ transform: @escaping (Self.Output) -> P) ->
    AnyPublisher<(T, P.Output), Self.Failure> where T == Self.Output, P: Publisher, Self.Failure == P.Failure {
        map {
            ($0, transform($0))
        }
        .flatMap(maxPublishers: maxPublishers) { (value) -> AnyPublisher<(T, P.Output), Self.Failure> in
            Just(value.0).zip(value.1.flatMap { Just($0) }).eraseToAnyPublisher()
        }
        .eraseToAnyPublisher()
    }
} 

I wrote tests for it and it did what I wanted it to do. But replacing it with your logic:

    func zipFlatMap<T, P>(maxPublishers: Subscribers.Demand = .unlimited, _ transform: @escaping (Self.Output) -> P) ->
AnyPublisher<(T, P.Output), Self.Failure> where T == Self.Output, P: Publisher, Self.Failure == P.Failure {
    flatMap(maxPublishers: maxPublishers) { tValue in
        transform(tValue).map { pOutput in (tValue, pOutput) }
    }.eraseToAnyPublisher()
}

My code seems to work exactly as it should and now I probably don't even need to define this operator because the code in it is so simple.

Thank you for helping me understand this in a new way and providing exactly the solution I needed.

1 Like
Terms of Service

Privacy Policy

Cookie Policy