Combine equivalent to RxSwift shareReplay operators

Hi,

I am trying to create a publisher that performs a network request when first subscribed and replays the result to each new subscriber. This can be done in RxSwift using the shareReplay operators.

I thought that using the share operator together with the buffer operator would do the trick. Here's my attempt:

struct Country: Codable {
	let name: String
	let nativeName: String
	let capital: String
}

var cancellables = Set<AnyCancellable>()

let url = URL(string: "https://restcountries.eu/rest/v2/alpha/es")!
let countryPublisher = URLSession.shared.dataTaskPublisher(for: url)
	.map { $0.data }
	.decode(type: Country.self, decoder: JSONDecoder())
	.print()

let sharedCountryPublisher = countryPublisher
	.share()
	.buffer(size: 1, prefetch: .byRequest, whenFull: .dropOldest)

sharedCountryPublisher
	.sink(receiveCompletion: { print("1: \($0)") },
		  receiveValue: { print("1: \($0)") })
	.store(in: &cancellables)

DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
	sharedCountryPublisher
		.sink(receiveCompletion: { print("2: \($0)") },
			  receiveValue: { print("2: \($0)") })
		.store(in: &cancellables)
}

Unfortunately the second subscriber doesn't get any buffered value, just a completion event. If you run this in a Swift playground, the output is as follows:

receive subscription: (Decode)
request unlimited
receive value: (Country(name: "Spain", nativeName: "España", capital: "Madrid"))
1: Country(name: "Spain", nativeName: "España", capital: "Madrid")
receive finished
1: finished
2: finished

Am I missing something? Is there any other way to get this behaviour with Combine?

EDIT: I was clearly misunderstanding the buffer operator, which serves as a mechanism to throttle back pressure and not for replaying values. I guess Combine doesn't have yet a replay or shareReplay operator. Any plans to support that in the future?

Thanks in advance,
Guille

3 Likes

Hi,

Last time I needed a shareReplay publisher, I struggled with the undocumented buffer method, as you did, until I gave up and just used the shareReplay that ships with Entwine.

I'm not sure I'm comfortable with this third-party library yet. I'm always scared when I look inside the guts of reactive libraries, they all look like they are doing premature abstraction, for unclear benefits. But who am I to judge? And it may drag you out of a pit.

Thanks for the suggestion, I will take a look at this library. For the moment, I am using a CurrentValueSubject to publish the results of the network operation I'd like to share with multiple subscribers.

@gonzalezreal I've used your example to learn more about Combine :slight_smile:

You may already have a better solution, but if you define sharedCountryPublisher as

let sharedCountryPublisher = countryPublisher
    .multicast(subject: ReplaySubject())
    .autoconnect()

or even

extension Publisher {
    func shareReplay(maxValues: Int = 0) -> AnyPublisher<Output, Failure> {
        multicast(subject: ReplaySubject(maxValues: maxValues)).autoconnect().eraseToAnyPublisher()
    }
}

let sharedCountryPublisher = countryPublisher.shareReplay()

then it should work using this ReplaySubject:

final class ReplaySubject<Input, Failure: Error>: Subject {
    typealias Output = Input
    private var recording = Record<Input, Failure>.Recording()
    private let stream = PassthroughSubject<Input, Failure>()
    private let maxValues: Int
    init(maxValues: Int = 0) {
        self.maxValues = maxValues
    }
    func send(subscription: Subscription) {
        subscription.request(maxValues == 0 ? .unlimited : .max(maxValues))
    }
    func send(_ value: Input) {
        recording.receive(value)
        stream.send(value)
        if recording.output.count == maxValues {
            send(completion: .finished)
        }
    }
    func send(completion: Subscribers.Completion<Failure>) {
        recording.receive(completion: completion)
        stream.send(completion: completion)
    }
    func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Input == S.Input {
        Record(recording: self.recording)
            .append(self.stream)
            .receive(subscriber: subscriber)
    }
}

(Disclaimer: I haven't considered thread-safety for this)

2 Likes