Combine: Creating a custom Publisher by wrapping another Publisher and operating on it

I am trying to create a custom publisher that basically wraps a different publisher and performs a series of operators on it. The reason why I want a custom publisher is because I'm trying to encapsulate some logic for re-use within it and I also need to store some state.

Here is a simplified version of what I'm doing:

struct BundledPublisher<State, Output, Failure: Error>: Publisher {
    var wrapped: AnyPublisher<Output, Failure>
    var state: State
    
    func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
        // Start with original publisher
        wrapped
            // Add some operators, not really important what these are
            .delay(for: .seconds(1), scheduler: DispatchQueue.main)
            .dropFirst()
            // forward the receive call
            .receive(subscriber: subscriber)
    }
}

My question is if this is a legal, acceptable and functioning workflow? Testing seems to indicate that this approach works correctly. But it's unclear to me if I'm missing something with regards to maybe back pressure or demand.

1 Like

This seems fine to me, but the main thing I continue to be unsure about in this context (i.e., wrapping publishers), is whether to call subscribe() or receive(subscriber:). I've seen advice on the internet that one should always call subscribe(), but I honestly still don't understand why, or what the implications are for not doing so.

1 Like

I've seen advice on the internet that one should always call subscribe()

Thanks for the reply @sharplet. Do you have any references to where you've seen that advice?

Thanks!

I'm going to give a canonical answer here that subscribe(…) in Combine has some additional observation logic that we would prefer get triggered before it forwards the subscription over, and that it is preferred over invoking receive(subscription:) directly.

3 Likes

Thank you @millenomi, I appreciate the prompt response! So this is an acceptable way to wrap another publisher then?

struct BundledPublisher<State, Output, Failure: Error>: Publisher {
    var wrapped: AnyPublisher<Output, Failure>
    var state: State
    
    func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
        wrapped
            .delay(for: .seconds(1), scheduler: DispatchQueue.main)
            .dropFirst()
            .subscribe(subscriber)
    }
}

Is there any documentation that I can refer my colleagues to? Do you have some insider insight into how this is implemented?

Slightly off-topic, but if this is representative of your real custom publisher, it might be worth making it generic over the publisher type, instead of Output and Failure individually:

struct BundledPublisher<Wrapped: Publisher>: Publisher {
  typealias Output = Wrapped.Output
  typealias Failure = Wrapped.Failure

  var wrapped: Wrapped

  // ...
}

This would help you avoid intermediate AnyPublishers where they aren't strictly necessary.

1 Like

I do see this in the docs, which seems fairly clear that I shouldn't be calling receive(subscriber:) directly. Any additional references are welcome. Thank you.

/// Attaches the specified subscriber to this publisher.
///
/// Always call this function instead of ``Publisher/receive(subscriber:)``.
/// Adopters of ``Publisher`` must implement ``Publisher/receive(subscriber:)``. The implementation of ``Publisher/subscribe(_:)-4u8kn`` provided by ``Publisher`` calls through to ``Publisher/receive(subscriber:)``.
///
/// - Parameter subscriber: The subscriber to attach to this publisher. After attaching, the subscriber can start to receive values.

Good point! Thank you