Hello,
In order to adapt Swift Concurrency, we have to do it in small steps and it means converting just parts of our code. Moreover, in some cases we will have to continue using Combine/other reactive frameworks.
My question is, is there a way to convert async function call to Combine that works without warnings with strict concurrency enabled?
My first idea was:
extension Future {
convenience init(asyncFunction: @escaping () async throws -> Output) where Failure == Error {
self.init { (promise: @escaping (Result<Output, any Error>) -> Void) in
Task {
do {
let value = try await asyncFunction() // Capture of 'asyncFunction' with non-sendable type '() async throws -> Output' in a `@Sendable` closure
promise(.success(value)) // Capture of 'promise' with non-sendable type '(Result<Output, any Error>) -> Void' in a `@Sendable` closure
} catch {
promise(.failure(error))
}
}
}
}
}
I can mark asyncFunction with Sendable and let users of this init take care of that. But I cannot get rid of the second warning.
I also tried:
extension AnyPublisher {
init(asyncFunction: @escaping () async throws -> Output) where Failure == Error {
let subject = PassthroughSubject<Output, Failure>()
let task = Task {
do {
let value = try await asyncFunction()
subject.send(value)
subject.send(completion: .finished)
} catch {
subject.send(completion: .failure(error))
}
}
self.init(subject.handleEvents(receiveCancel: { task.cancel() }))
}
}
But PassthroughSubject is also not Sendable and gives similar warnings as above.
The other idea would be to write custom Subject that would be Sendable but I'm not sure how complicated this would be.
1 Like
It looks like the best way is to create a custom Publisher
public struct TaskThrowingPublisher<Output>: Publisher {
public typealias Failure = Error
let priority: TaskPriority
let work: @Sendable () async throws -> Output
public init(
priority: TaskPriority = .medium,
work: @escaping @Sendable () async throws -> Output
) {
self.priority = priority
self.work = work
}
public func receive<S>(subscriber: S) where S: Subscriber, Self.Failure == S.Failure, Self.Output == S.Input {
let subscription = TaskSubscription(
priority: priority,
work: work,
subscriber: subscriber
)
subscriber.receive(subscription: subscription)
subscription.start()
}
final class TaskSubscription<Output, Downstream: Subscriber>: Combine.Subscription where Downstream.Input == Output, Downstream.Failure == Error {
private var handle: Task<Output, Error>?
private let priority: TaskPriority
private let work: () async throws -> Output
private let subscriber: Downstream
init(
priority: TaskPriority,
work: @escaping @Sendable () async throws -> Output,
subscriber: Downstream
) {
self.priority = priority
self.work = work
self.subscriber = subscriber
}
func start() {
handle = Task(priority: priority) { [subscriber, work] in
do {
let result = try await work()
try Task.checkCancellation()
_ = subscriber.receive(result)
subscriber.receive(completion: .finished)
return result
} catch {
subscriber.receive(completion: .failure(error))
throw error
}
}
}
func request(_ demand: Subscribers.Demand) {}
func cancel() {
handle?.cancel()
}
}
}
Thank you for your response. Unfortunately, it looks like there is no way to solve this issue, there are still concurrency warnings. Sendable closure passed to Task's init captures non-sendable subscriber. The whole Subscriber protocol would have to marked as Sendable to make it work. But this is highly unlikely, I don't think Combine can make such guarantees.
1 Like