Combine: Convert blocking call to publisher

Hey,
I have an old callback API, which has a newValue callback handler emitting one value at a time, and a completion callback, containing an Error or nil, if success.

Unfortunately, calling collecting blocks until the API sent all values!

func<T> collecting(newValue: @escaping (T) -> Void, completion: @escaping (Error?) -> Void) { ... }

func publisher<T>(_ type: T.Type) -> AnyPublisher<T, Error> {
    let subject = PassthroughSubject<T, Error>()
        
    collecting(newValue: { value: T -> Void in
        subject.send(value)
    } completionHandler: { error: Error? -> Void in
        if let error = error {
            subject.send(completion: .failure(error))
        } else {
            subject.send(completion: .finished)
        }
    }
    return subject.eraseToAnyPublisher()
}

This code above does not work, because calling collecting blocks, and the return statement is only called with finished .

How can you implement such API in Combine?

I don't want to use DispatchQueue.async , because scheduling should be handled by Combine.

The collecting function blocks the thread before the newValue and completion closures are called. Therefore, nothing you do with the PassthroughSubject is going to change that fact. You must call the collecting function on a background thread so that the background thread is blocked:

func publisher<T>(_ type: T.Type) -> AnyPublisher<T, Error> {
    
    let subject = PassthroughSubject<T, Error>()
    
    DispatchQueue.global().async {
        
        collecting(newValue: { (value: T) -> Void in
            subject.send(value)
        }, completionHandler: { (error: Error?) -> Void in
            if let error = error {
                subject.send(completion: .failure(error))
            } else {
                subject.send(completion: .finished)
            }
        })
        
    }
    
    return subject
        .eraseToAnyPublisher()
    
}

Yeah, this is my current workaround too. But I don't like it, because when you call the values extension with Swift 5.5, the thread handling based on the new async dispatcher, and this publisher switchs the dispatcher by itself.
I hoped, you can use Future or some kind of flatMap, but I failed.

Future only publishes a single element. flatmap transforms the output from an upstream publisher into a new publisher.

This makes absolutely no sense. What are you trying to say?

func consume() {
  Task { // switching the thread, controlled by the consumer, not the publisher
    for try await newValue in publisher().values {
      
   }
}

I don't know what happens internal, when I switch the dispatcher in the consumer. Sounds like a code smell to me.

I don't know what a dispatcher or a consumer is.

If you're going to use async/await, then why are you creating a Combine wrapper for your callback-based API?

In this case the combos that you are looking for are either Deferred + Future (for one shot things) or Deferred + PassthroughSubject. So deferred will establish upon subscription; this means that your collecting thing will only execute effectively upon first demand. For more details here are the docs for what I am suggesting.

func publisher<T>(_ type: T.Type) -> AnyPublisher<T, Error> {
    Deferred<PassthroughSubject<T, Error>> {
      let subject = PassthroughSubject<T, Error>()
      collecting { value in
          subject.send(value)
      } completionHandler: { error in
          if let error = error {
              subject.send(completion: .failure(error))
          } else {
              subject.send(completion: .finished)
          }
      }
      return subject
    }.eraseToAnyPublisher()
}

@Peter-Schorn I missed the great back porting of async/await. Thanks for the hint, I will switch to AsyncSequence.
But now I don't get it: How does AsyncSequence support back pressure?

// 3rd party exposed collecting function
interface Cancelable {
   func cancel() 
}
func collecting(_ flow: Flow, blockingAction: (Any) -> Unit, onCompletion: (Error?) -> Unit) -> Cancelable { .. }


// my implementation
let s = AsyncStream { cont in
              let job = FlowsKt.collecting(self) { value in
                  print(value as! T)
                  cont.yield(value as! T) // this call does not block and requests without back pressure...
                  await cont.yield(value as! T) // would be perfect!
              } onCompletion: { error in
                  if let error = error {
                      fatalError(error.localizedDescription)
                  } else {
                      cont.finish()
                  }
              }
          cont.onTermination = { @Sendable termination in
              if case .cancelled = termination {
                  job.cancel()
              }
        }
    }

for await newValue in s {
    Task.sleep(5_000_000)
}

This won't work because collecting produces an infinitive stream.

It looks like the only option looks is to use Publishers with DispatchQueue.async { }, because I need back pressure. The alternative, AsyncStream does not support it, because yield does not wait until consuming.

Wrap your whole thing with Deferred, so that your setup code runs only when you have a subscriber ready to receive values.

Right now, you have a non-buffering PassthroughSubject (akin to BroadcastChannel or SharedFlow in Kotlin Coroutines with zero buffer capacity) where you start forwarding values to it immediately, even before the PassthroughSubject is returned to the caller. So the caller won't even have a chance to receive any emitted values, if collecting(_:_:) happens to emit some or all of them synchronously.

A Publisher normally behaves like a Flow in Kotlin Coroutines, has "cold" semantics, and starts a new instance only when a subscriber/collector is attached.

func publisher<T>(_ type: T.Type) -> AnyPublisher<T, Error> {
    Deferred {
        let subject = PassthroughSubject<T, Error>()
        FlowsKt.collecting(/** ... **/)
        return subject
    }.eraseToAnyPublisher()
}

If you have a truly infinite sequence that must be backpressured, neither AsyncStream nor PassthroughSubject is capable of that.

If anything, you will first have to adjust FlowsKt.collecting so that it can support backpressure. This likely means a continuation object should be passed alongside the value, so that:

  1. the (supposedly) underlying Flow can suspend and wait on the continuation; while
  2. your Swift code can choose when to hand back control by eventually resuming the continuation.

Then based on the updated FlowsKt.collecting, you will have to write either a custom Publisher that deals with Combine's integer-based demand management, or a custom AsyncSequence iterator.

1 Like

@Anders_Ha Thanks for the hint! I tried a custom AsyncSequence iterator (and a custom Publisher implementation) and it works with back pressure.