How to mix async/await and Combine?

I'm trying to update some code to async/await and ran into some areas where I'm listening to publishers that kick off asynchronous tasks. The task was originally a Futures in a flatMap like this:

func requestAuthorization(_ value: String) -> Future<Bool, Never> {
    Future { promise in
        DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
            promise(.success(true))
        }
    }
}

//

let relay1 = PassthroughSubject<String, Never>()
let relay2 = PassthroughSubject<String, Never>()

let subscription1 = Publishers.Merge(
    relay1,
    relay2
)
.debounce(for: 0.5, scheduler: RunLoop.main)
.flatMap { value in
    requestAuthorization(value)
}
.sink { granted in
    print("Granted: \(granted)")
}

//

relay1.send("Hello")
relay2.send("World!")

This works great, but my requestAuthorization function feels better as an async/await task since it just returns the status:

func requestAuthorizationAsync(_ value: String) async -> Bool {
    await withCheckedContinuation { continuation in
        DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
            continuation.resume(returning: true)
        }
    }
}

Trying to plug this back into my Combine chain doesn't work though.. I get a compilation error:

let subscription2 = Publishers.Merge(
    relay1,
    relay2
)
.debounce(for: 0.5, scheduler: RunLoop.main)
.flatMap { value in
    await requestAuthorizationAsync(value)
}
.sink { granted in
    print("Granted: \(granted)")
}

// error: CombineAsync.playground:50:2: error: no exact matches in call to instance method 'flatMap'
// .flatMap { value in

I even tried to place it in the sync itself but still a compile error:

let subscription2 = Publishers.Merge(
    relay1,
    relay2
)
.debounce(for: 0.5, scheduler: RunLoop.main)
.sink { value in
    let granted = await requestAuthorizationAsync(value)
    print("Granted: \(granted)")
}

// error: Cannot pass function of type '(String) async -> Void' to parameter expecting synchronous function type

Is there a way that Swift Concurrency and Combine work seamlessly together?

3 Likes

The flatMap example does not make sense like this because Future IIRC is just another publisher which will be flattened and resolved later down the road. However the sink example should work when you opt-in into the async context. The wwdc sessions all use async { ... } closure for that, but that is likely to change to Task { ... } in the near future. The idea of async is to allow a function execution to be suspended, hence why it does not make much sense in flatMap context but seems to be okay in sink as long as you convert it to async.

I believe it’s bad idea to use two different APIs together, I would write same logic but completely with using only AsyncAwait. The same thing is that I would not use Grand central dispatch API with Combine as you are in example

Thanks for the explanation @DevAndArtist , wrapping the await operation with an async closure in the sink worked. Initially I thought what if I wanted concurrent processes to participate in the Combine chain, but realized after you mentioned it doesn't really make sense and sink would be the spot.

@kyzmitch I totally agree with you, my example was really horrible and the GCD was not a real scenario but imagine accessing the network, file system, or an SDK in it. My example runs in Playground which is the reason for the GCD (and my laziness).

However, mixing async/await and Combine I believe is a valid use case but correct me if I'm not looking at this right. Because publishers are listeners to events, like a users location or a web socket, but async/await is handling other calls that are being performed some where else in the background. So I would think that Combine and async/await would overlap a lot. I'm surprised Combine is completely absent from WWDC 2021 :thinking:

1 Like

Combine has not received any public new API changes except for that some types now do conform to the Sendable protocol which is related to async/await, but AFAICT that‘s only to guarantee their thread safety.

From my perspective I would likely try to eliminate the GCD part of your function by using the new async/await and then wrap that async function inside the Future inside the flatMap operation.

That said, a Future is conceptually similar to an async function, as it eventually returns a result which contains a result value or a potential error (unless it‘s never).

func foo() -> Future<A, Error>
func foo() async throws -> A

func bar() -> Future<A, Never>
func bar() async -> A

func baz() -> Result<B, Error>
func baz() throws -> B

func qux() -> Result<B, Never>
func qux() -> B

I‘m not at my mac at the moment, nor have I tested any of the new stuff yet as I‘m still just watching the wwdc sessions, but I would try and extend Future with some convenient initializers (if possible) to accept an async function that either throws or not (concrete errors are not yet allowed as Swift still lacks the support of typed throws).

2 Likes

If you have both Futures and async functions in your code base, and need them to work seemlessly together, I think I would either create a convenience initializer on Future that takes an async function argument and use it in combination with Publisher.flatMap — or add an overload to flatMap (possibly a differently names function) in an extension to Publisher that accepts an async func as argument.

I'm looking for an answer to the question: in general, is it good to mix Combine with async/await??? Isn't it a bad smell? For example, I was spiking some solutions and came up with something like this

protocol Repo {
    func save(_ book: Book) async
    func get(by id: Book.ID) -> AnyPublisher<Book, Error>
}

in my UseCase I was doing some operations and in that flow I needed to save a book and then return the observable book from the repo, so it forces me to do something like this in my UseCase

func execute(with /*some parameters*/) async -> AnyPublisher<Book, Error>

doesn't it look weird???
In that case, all callers would need to await for the result from the UseCase and then attach their observers (sink etc.)

Yes, this looks very weird. It doesn't make much sense to return a publisher from a function asynchronously given that the publisher can already publish its value asynchronously. You shouldn't do this.

1 Like