How to correctly convert from an async function to Combine (or any other framework)

Hello,

In order to adapt Swift Concurrency, we have to do it in small steps and it means converting just parts of our code. Moreover, in some cases we will have to continue using Combine/other reactive frameworks.

My question is, is there a way to convert async function call to Combine that works without warnings with strict concurrency enabled?

My first idea was:

extension Future {
    convenience init(asyncFunction: @escaping () async throws -> Output) where Failure == Error {
        self.init { (promise: @escaping (Result<Output, any Error>) -> Void) in
            Task {
                do {
                    let value = try await asyncFunction() // Capture of 'asyncFunction' with non-sendable type '() async throws -> Output' in a `@Sendable` closure
                    promise(.success(value)) // Capture of 'promise' with non-sendable type '(Result<Output, any Error>) -> Void' in a `@Sendable` closure
                } catch {
                    promise(.failure(error))
                }
            }
        }
    }
}

I can mark asyncFunction with Sendable and let users of this init take care of that. But I cannot get rid of the second warning.

I also tried:

extension AnyPublisher {
    init(asyncFunction: @escaping () async throws -> Output) where Failure == Error {
        let subject = PassthroughSubject<Output, Failure>()
        let task = Task {
            do {
                let value = try await asyncFunction()
                subject.send(value)
                subject.send(completion: .finished)
            } catch {
                subject.send(completion: .failure(error))
            }
        }
        self.init(subject.handleEvents(receiveCancel: { task.cancel() }))
    }
}

But PassthroughSubject is also not Sendable and gives similar warnings as above.

The other idea would be to write custom Subject that would be Sendable but I'm not sure how complicated this would be.

1 Like

It looks like the best way is to create a custom Publisher

public struct TaskThrowingPublisher<Output>: Publisher {

    public typealias Failure = Error

    let priority: TaskPriority
    let work: @Sendable () async throws -> Output

    public init(
        priority: TaskPriority = .medium,
        work: @escaping @Sendable () async throws -> Output
    ) {
        self.priority = priority
        self.work = work
    }

    public func receive<S>(subscriber: S) where S: Subscriber, Self.Failure == S.Failure, Self.Output == S.Input {
        let subscription = TaskSubscription(
            priority: priority,
            work: work,
            subscriber: subscriber
        )
        subscriber.receive(subscription: subscription)
        subscription.start()
    }

    final class TaskSubscription<Output, Downstream: Subscriber>: Combine.Subscription where Downstream.Input == Output, Downstream.Failure == Error {

        private var handle: Task<Output, Error>?
        private let priority: TaskPriority
        private let work: () async throws -> Output
        private let subscriber: Downstream

        init(
            priority: TaskPriority,
            work: @escaping @Sendable () async throws -> Output,
            subscriber: Downstream
        ) {
            self.priority = priority
            self.work = work
            self.subscriber = subscriber
        }

        func start() {
            handle = Task(priority: priority) { [subscriber, work] in
                do {
                    let result = try await work()
                    try Task.checkCancellation()
                     _ = subscriber.receive(result)
                     subscriber.receive(completion: .finished)
                    return result
                } catch {
                    subscriber.receive(completion: .failure(error))
                    throw error
                }
            }
        }

        func request(_ demand: Subscribers.Demand) {}

        func cancel() {
            handle?.cancel()
        }
    }
}

Thank you for your response. Unfortunately, it looks like there is no way to solve this issue, there are still concurrency warnings. Sendable closure passed to Task's init captures non-sendable subscriber. The whole Subscriber protocol would have to marked as Sendable to make it work. But this is highly unlikely, I don't think Combine can make such guarantees.

1 Like