jmjauer
(Johannes Auer)
1
Hi,
I know there is the receiveOn call for defining the scheduler for all downstream publishers, but how can I tell combine on wich scheduler to execute the first publisher? Example:
Say I want to read a file from a background thread. Since there is no special publisher for that, I use a Future:
extension Data {
static func read(fileAt url: URL) -> Future<Data, Error> {
return Future { promise in
do {
let data = try Data(contentsOf: url)
promise(Result<Data, Error>.success(data))
} catch (let error) {
promise(Result<Data, Error>.failure(error))
}
}
}
}
Now I can use this as a publisher of the data at that file url, and its easy to execute the downstream on another thread:
let disposal = Data.read(fileAt: myFileUrl)
.receive(on: myBackgroundthread)
.decode(type: MyType.self, decoder: JSONDecoder())
.sink(...)
But how can I tell combine on wich scheduler to perform the first publisher? I know I could run the promise in the future on another queue, but I wonder if there is a way doing it with the combine api.
Greetings,
Johannes
Hello Johannes Auer,
The operator that you are looking for is subscribe(on:options:). This changes the execution context of the upstream messages.
Data.read(fileAt: myFileUrl)
.subscribe(on: globalScheduler)
.receive(on: myBackgroundthread)
.decode(type: MyType.self, decoder: JSONDecoder())
.sink(...)
Couple of side notes
-
Future is an eager publisher. This means, the closure within Future will get fired when read(fileAt url: URL) is called instead of when attaching a subscriber. You can mitigate this by wrapping Future within a Deferred Publisher.
-
func read(fileAt url: URL) -> Future<Data, Error>. I would suggest erasing to AnyPublisher so to hide the implementation detail (nit).
jmjauer
(Johannes Auer)
3
Hi Roshan,
thanks for your answer and suggestions. As you said, Future executes its closure immediately, so the code above only works when the Future is wrapped in a Deferred publisher. Seems like a lot of code for such a simple thing. This wrapper could be handy:
func Publish<S, Output, Failure>(on scheduler: S, value: @escaping () -> Result<Output, Failure>) -> AnyPublisher<Output, Failure> where S: Scheduler {
return Future { promise in
scheduler.schedule {
let result = value()
promise(result)
}
}.eraseToAnyPublisher()
}
Greetings,
Johannes
Hey @jmjauer,
You can use Deferred publisher, it should be more convenient way:
static func read(fileAt url: URL) -> AnyPublisher<Data, Error> {
return Deferred {
Future { promise in
do {
let data = try Data(contentsOf: url)
promise(Result<Data, Error>.success(data))
} catch (let error) {
promise(Result<Data, Error>.failure(error))
}
}
}.eraseToAnyPublisher()
}
Deferred waits until subscribe(on:) is receiver, so the using should look like @RoshanNindrai has written:
Data.read(fileAt: myFileUrl)
.subscribe(on: globalScheduler)
.receive(on: myBackgroundthread)
.decode(type: MyType.self, decoder: JSONDecoder())
.sink(...)
1 Like