Coordinating Combine's `receive(on:)` with `@MainActor`

I have an extension with a property marked as @MainActor. Something like this:

extension NSApplication {
    @MainActor 
    var isDarkMode: Bool {
        ...
    }
}

I'd like to use this property in a combine pipeline, but it produces an error:

Just(NSApplication.shared)
    .receive(on: DispatchQueue.main)
    .map(\.isDarkMode)  // Error
    .assign(to: &somethingElse)

Even though the receive(on:) operator ensures the code is on the main thread, the compiler generates an error: Property 'uiProperty' isolated to global actor 'MainActor' can not be referenced from a non-isolated synchronous context.

Is there a way to get a combine pipeline to recognize that it's operating on the main thread and allow @MainActor code?

Tried the usual places, but only found this (unanswered) question from several months ago: @MainActor access in closures created in Main Actor functions.

2 Likes

Yeah, I was the author of that post, and I don’t believe there is atm, nor do I really expect there to be as Combine is somewhat orthogonal to Swift Concurrency.

In this case, if you used a closure rather than a key path, and ensured you created this closure in a @MainActor context, you will get implicit main actor on that closure which will allow you to access it.

1 Like

Thanks for the reply. I tried modifying the pipeline as you suggested:

Just(NSApplication.shared)
    .receive(on: DispatchQueue.main)
    .map { @MainActor app -> Bool in app.isDarkMode }  // Warning
    .assign(to: &somethingElse)

This gives me the warning: Converting function value of type '@MainActor (NSApplication) -> Bool' to '(NSApplication) -> Bool' loses global actor 'MainActor'. I've switched the code to use AsyncSequence instead (which has its own rough patches). Thanks for the advice!

Sorry, for reference I was referring to the function you create the closure in.

@MainActor
func myFunction() {
    Just(NSApplication.shared)
        .receive(on: DispatchQueue.main)
        .map { $0.isDarkMode }
        .assign(to: &somethingElse)
        .store(in: &cancellables)
}

Note in the above case, this doesn’t give you any win over simply calling the method as it will only receive one value.

In the above code, because the closure is constructed in a MainActor context, it is implicitly allowed to talk to a MainActor API even though it is passed into a context that is not guaranteed to be called on the main actor. If you explicitly annotate the closure itself, Swift will then need to verify the promises you made on the closure are maintained by the type system, which it cannot verify with combine.

The combine pipeline I actually use is more complex than a Just publisher, but it keeps the example simpler to explain. I noticed the effect of marking the function as @MainActor as well, but this is terribly confusing. For example, how would it handle this situation?

@MainActor
func function() {
    Just(NSApplication.shared)
        .receive(on: DispatchQueue.global())
        .map { $0.isDarkMode }
        .assign(to: &somethingElse)
}

I have no idea what thread the map operator will execute on in this case, and there's no warnings or errors. Combine and async/await just don't seem to compose well in this case. Using AsyncSequence is an improvement.

let sequence = Just(NSApplication.shared)
    .values
    .map { await $0.isDarkMode }

for await next in sequence {
    print(next)
}

The compiler will raise an error if you don't include the await within the map operator. I do wish there were a way to indicate that a sequence is context switching to the main thread. It almost certainly has performance implications that are only visible if you see inside the sequence.

Just(NSApplication.shared)
    .receive(on: DispatchQueue.main)
    .map { @MainActor(unsafe) in $0.isDarkMode }
    .assign(to: &somethingElse)
1 Like

Yeah, in the scenario you raise, with a global queue, it will indeed execute on the global queue even though the closure is implicitly MainActor as it’s constructed in the MainActor function.

I believe this was done to avoid Combine being practically unusable for a lot of cases, and the implicit allowance of access is not a guarantee it will execute on the main actor, merely that the closure is allowed to touch things that are MainActor isolated, and you need to maintain that manually.

AsyncSequence is revised to better support actual explicit Swift Concurrency features, I believe they’ve left the previous behaviour to avoid breaking all Combine code.

Super helpful thread. I was scratching my head today on which a Combine pipeline wasn't running on the MainActor despite class being annotated as such.

I am currently looking into figuring out if I can do something like .receiveOnMainActor() in the same way as .receive(on: DispatchQueue.main) but I'm really scratching my head how to not only execute everything that follows on the right actor but also conveying that information forward so all of the following functions in the publisher chain expect to be ran on @MainActor?.

It seems to be impossible?

1 Like

Our workaround for the first part (executing within the right global actor) is to execute the block of code needed within the Combine pipeline within an async<->Combine adaptation helper block (ie: .asyncMap{..} below)

somePublisher.asyncMap { (widgets:[Widget]) in
	await _patch( widgets: widgets )
}.store( in: &cancellables )
		
		
@DomainActor func _patch( widgets:[Widget] ) -> [Widget] {
	widgets.map { (widget:Widget) in
		context.patch( widget: widget )
	}
}

where the helper async adapter function is:

// REF: https://www.swiftbysundell.com/articles/calling-async-functions-within-a-combine-pipeline/
public func asyncMap<T>( _ transform:@escaping (Output) async throws -> T ) -> Publishers.FlatMap<Future<T, Error>, Self> {
	flatMap { (value:Output) in
		Future { promise in
			Task {
				do {
					let output = try await transform( value )
					promise( .success( output ))
				} catch {
					promise( .failure( error ))
				}
			}
		}
	}
}

Any idea how we could make it chain down to further Combine calls like .receive( on: ... ) would?

Ideally we would be able to adapt the global actor's executor to a Combine scheduler (ie: .receive( on: DomainActor.shared.unownedExecutor ).combineScheduler )

Any idea how we could write the .combineScheduler adapter?

2 Likes