Hi,
I am trying to create a publisher that performs a network request when first subscribed and replays the result to each new subscriber. This can be done in RxSwift using the shareReplay operators.
I thought that using the share operator together with the buffer operator would do the trick. Here's my attempt:
struct Country: Codable {
let name: String
let nativeName: String
let capital: String
}
var cancellables = Set<AnyCancellable>()
let url = URL(string: "https://restcountries.eu/rest/v2/alpha/es")!
let countryPublisher = URLSession.shared.dataTaskPublisher(for: url)
.map { $0.data }
.decode(type: Country.self, decoder: JSONDecoder())
.print()
let sharedCountryPublisher = countryPublisher
.share()
.buffer(size: 1, prefetch: .byRequest, whenFull: .dropOldest)
sharedCountryPublisher
.sink(receiveCompletion: { print("1: \($0)") },
receiveValue: { print("1: \($0)") })
.store(in: &cancellables)
DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
sharedCountryPublisher
.sink(receiveCompletion: { print("2: \($0)") },
receiveValue: { print("2: \($0)") })
.store(in: &cancellables)
}
Unfortunately the second subscriber doesn't get any buffered value, just a completion event. If you run this in a Swift playground, the output is as follows:
receive subscription: (Decode)
request unlimited
receive value: (Country(name: "Spain", nativeName: "España", capital: "Madrid"))
1: Country(name: "Spain", nativeName: "España", capital: "Madrid")
receive finished
1: finished
2: finished
Am I missing something? Is there any other way to get this behaviour with Combine?
EDIT: I was clearly misunderstanding the buffer operator, which serves as a mechanism to throttle back pressure and not for replaying values. I guess Combine doesn't have yet a replay or shareReplay operator. Any plans to support that in the future?
Thanks in advance,
Guille
3 Likes
Hi,
Last time I needed a shareReplay publisher, I struggled with the undocumented buffer method, as you did, until I gave up and just used the shareReplay that ships with Entwine.
I'm not sure I'm comfortable with this third-party library yet. I'm always scared when I look inside the guts of reactive libraries, they all look like they are doing premature abstraction, for unclear benefits. But who am I to judge? And it may drag you out of a pit.
Thanks for the suggestion, I will take a look at this library. For the moment, I am using a CurrentValueSubject to publish the results of the network operation I'd like to share with multiple subscribers.
JJJ
(Jens Jakob Jensen)
4
@gonzalezreal I've used your example to learn more about Combine 
You may already have a better solution, but if you define sharedCountryPublisher as
let sharedCountryPublisher = countryPublisher
.multicast(subject: ReplaySubject())
.autoconnect()
or even
extension Publisher {
func shareReplay(maxValues: Int = 0) -> AnyPublisher<Output, Failure> {
multicast(subject: ReplaySubject(maxValues: maxValues)).autoconnect().eraseToAnyPublisher()
}
}
let sharedCountryPublisher = countryPublisher.shareReplay()
then it should work using this ReplaySubject:
final class ReplaySubject<Input, Failure: Error>: Subject {
typealias Output = Input
private var recording = Record<Input, Failure>.Recording()
private let stream = PassthroughSubject<Input, Failure>()
private let maxValues: Int
init(maxValues: Int = 0) {
self.maxValues = maxValues
}
func send(subscription: Subscription) {
subscription.request(maxValues == 0 ? .unlimited : .max(maxValues))
}
func send(_ value: Input) {
recording.receive(value)
stream.send(value)
if recording.output.count == maxValues {
send(completion: .finished)
}
}
func send(completion: Subscribers.Completion<Failure>) {
recording.receive(completion: completion)
stream.send(completion: completion)
}
func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Input == S.Input {
Record(recording: self.recording)
.append(self.stream)
.receive(subscriber: subscriber)
}
}
(Disclaimer: I haven't considered thread-safety for this)
2 Likes