Combine define scheduler for *first* publisher

Hi,

I know there is the receiveOn call for defining the scheduler for all downstream publishers, but how can I tell combine on wich scheduler to execute the first publisher? Example:

Say I want to read a file from a background thread. Since there is no special publisher for that, I use a Future:

extension Data {
    
    static func read(fileAt url: URL) -> Future<Data, Error> {
        return Future { promise in
            do {
                let data = try Data(contentsOf: url)
                promise(Result<Data, Error>.success(data))
            } catch (let error) {
                promise(Result<Data, Error>.failure(error))
            }
        }
    }
    
}

Now I can use this as a publisher of the data at that file url, and its easy to execute the downstream on another thread:

let disposal = Data.read(fileAt: myFileUrl)
            .receive(on: myBackgroundthread)
            .decode(type: MyType.self, decoder: JSONDecoder())
            .sink(...)

But how can I tell combine on wich scheduler to perform the first publisher? I know I could run the promise in the future on another queue, but I wonder if there is a way doing it with the combine api.

Greetings,
Johannes

Hello Johannes Auer,

The operator that you are looking for is subscribe(on:options:). This changes the execution context of the upstream messages.

Data.read(fileAt: myFileUrl)
        .subscribe(on: globalScheduler)
        .receive(on: myBackgroundthread)
        .decode(type: MyType.self, decoder: JSONDecoder())
        .sink(...)

Couple of side notes

  1. Future is an eager publisher. This means, the closure within Future will get fired when read(fileAt url: URL) is called instead of when attaching a subscriber. You can mitigate this by wrapping Future within a Deferred Publisher.

  2. func read(fileAt url: URL) -> Future<Data, Error>. I would suggest erasing to AnyPublisher so to hide the implementation detail (nit).

Hi Roshan,

thanks for your answer and suggestions. As you said, Future executes its closure immediately, so the code above only works when the Future is wrapped in a Deferred publisher. Seems like a lot of code for such a simple thing. This wrapper could be handy:

func Publish<S, Output, Failure>(on scheduler: S, value: @escaping () -> Result<Output, Failure>) -> AnyPublisher<Output, Failure> where S: Scheduler {
    return Future { promise in
        scheduler.schedule {
            let result = value()
            promise(result)
        }
    }.eraseToAnyPublisher()
}

Greetings,
Johannes

Hey @jmjauer,
You can use Deferred publisher, it should be more convenient way:

static func read(fileAt url: URL) -> AnyPublisher<Data, Error> {
    return Deferred {
        Future { promise in
            do {
                let data = try Data(contentsOf: url)
                promise(Result<Data, Error>.success(data))
            } catch (let error) {
                promise(Result<Data, Error>.failure(error))
            }
        }
    }.eraseToAnyPublisher()
}

Deferred waits until subscribe(on:) is receiver, so the using should look like @RoshanNindrai has written:

Data.read(fileAt: myFileUrl)
        .subscribe(on: globalScheduler)
        .receive(on: myBackgroundthread)
        .decode(type: MyType.self, decoder: JSONDecoder())
        .sink(...)
1 Like