Even though the receive(on:) operator ensures the code is on the main thread, the compiler generates an error: Property 'uiProperty' isolated to global actor 'MainActor' can not be referenced from a non-isolated synchronous context.
Is there a way to get a combine pipeline to recognize that it's operating on the main thread and allow @MainActor code?
Yeah, I was the author of that post, and I don’t believe there is atm, nor do I really expect there to be as Combine is somewhat orthogonal to Swift Concurrency.
In this case, if you used a closure rather than a key path, and ensured you created this closure in a @MainActor context, you will get implicit main actor on that closure which will allow you to access it.
This gives me the warning: Converting function value of type '@MainActor (NSApplication) -> Bool' to '(NSApplication) -> Bool' loses global actor 'MainActor'. I've switched the code to use AsyncSequence instead (which has its own rough patches). Thanks for the advice!
Note in the above case, this doesn’t give you any win over simply calling the method as it will only receive one value.
In the above code, because the closure is constructed in a MainActor context, it is implicitly allowed to talk to a MainActor API even though it is passed into a context that is not guaranteed to be called on the main actor. If you explicitly annotate the closure itself, Swift will then need to verify the promises you made on the closure are maintained by the type system, which it cannot verify with combine.
The combine pipeline I actually use is more complex than a Just publisher, but it keeps the example simpler to explain. I noticed the effect of marking the function as @MainActor as well, but this is terribly confusing. For example, how would it handle this situation?
I have no idea what thread the map operator will execute on in this case, and there's no warnings or errors. Combine and async/await just don't seem to compose well in this case. Using AsyncSequence is an improvement.
let sequence = Just(NSApplication.shared)
.values
.map { await $0.isDarkMode }
for await next in sequence {
print(next)
}
The compiler will raise an error if you don't include the await within the map operator. I do wish there were a way to indicate that a sequence is context switching to the main thread. It almost certainly has performance implications that are only visible if you see inside the sequence.
Yeah, in the scenario you raise, with a global queue, it will indeed execute on the global queue even though the closure is implicitly MainActor as it’s constructed in the MainActor function.
I believe this was done to avoid Combine being practically unusable for a lot of cases, and the implicit allowance of access is not a guarantee it will execute on the main actor, merely that the closure is allowed to touch things that are MainActor isolated, and you need to maintain that manually.
AsyncSequence is revised to better support actual explicit Swift Concurrency features, I believe they’ve left the previous behaviour to avoid breaking all Combine code.
Super helpful thread. I was scratching my head today on which a Combine pipeline wasn't running on the MainActor despite class being annotated as such.
I am currently looking into figuring out if I can do something like .receiveOnMainActor() in the same way as .receive(on: DispatchQueue.main) but I'm really scratching my head how to not only execute everything that follows on the right actor but also conveying that information forward so all of the following functions in the publisher chain expect to be ran on @MainActor?.
Our workaround for the first part (executing within the right global actor) is to execute the block of code needed within the Combine pipeline within an async<->Combine adaptation helper block (ie: .asyncMap{..} below)
// REF: https://www.swiftbysundell.com/articles/calling-async-functions-within-a-combine-pipeline/
public func asyncMap<T>( _ transform:@escaping (Output) async throws -> T ) -> Publishers.FlatMap<Future<T, Error>, Self> {
flatMap { (value:Output) in
Future { promise in
Task {
do {
let output = try await transform( value )
promise( .success( output ))
} catch {
promise( .failure( error ))
}
}
}
}
}
Any idea how we could make it chain down to further Combine calls like .receive( on: ... ) would?
Ideally we would be able to adapt the global actor's executor to a Combine scheduler (ie: .receive( on: DomainActor.shared.unownedExecutor ).combineScheduler )
Any idea how we could write the .combineScheduler adapter?