Is it fair to declare Combine's AnyPublisher as unchecked sendable as long as output & error types are sendable?

Right now Combine publishers are not sendable, so we get warnings (or error if Swift 6 language mode is enabled) if we pass a publisher across actor boundaries.

Is there any downside in doing something like:

extension AnyPublisher: @retroactive @unchecked Sendable where Failure == Never, Output: Sendable {}

Since AnyPublisher is can't update any data, so accessing it from different threads should be fine.

I saw Xcode suggested Add '@preconcurrency' to treat 'Sendable'-related errors from module 'Combine' as warnings but just adding unchecked to a smaller scope feels safer.

2 Likes

Consider this setup:

open class NonSendable {
    var value: Int
    init(value: Int) {
        self.value = value
    }
}

// thread/task/actor A
// publish some A-isolated state
let subject = PassthroughSubject<NonSendable, Never>()
let publisher: AnyPublisher<Int, Never> = subject
    .buffer(1, prefetch: .keepFull, whenFull: .dropOldest)
    .map { $0.value }
    .eraseToAnyPublisher()

// send to thread/task/actor B
let notSendable = NotSendable(value: 0)
let cancellable = publisher
    .sink {
        // update some B-isolated state
        notSendable.value = $0
    }
  • Int is Sendable, so publisher would be Sendable with your extension
  • PassthroughSubject delivers elements synchronously as they're sent, so long as there's downstream demand
  • buffer creates upstream demand, but supplies elements only in response to downstream demand
  • map operates synchronously when elements are delivered
  • sink's closure is not marked @Sendable, so it can access B-isolated state

Now there are two situations:

  • Nothing sent through the subject yet. Creating the sink on B creates demand, which propagates through map to buffer, but there's no values.
    • Now A publishes a NonSendable value
    • which subject delivers synchronously to buffer
    • buffer delivers synchronously to map
    • map calls .value (safe! we're on A) and delivers it synchronously to sink
    • sink updates B-isolated state (unsafe! we're still on A)
  • Before publisher gets sent (so there's no sink and no demand)
    • A publishes a NonSendable value
    • which subject delivers synchronously to buffer
    • which buffer holds, until there's downstream demand
    • creating the sink on B creates demand, which propagates through map to buffer
    • buffer synchronously delivers the buffered value to map
    • map calls .value (unsafe! we're on B) and delivers it synchronously to sink
    • sink updates B-isolated state (safe! we're still on B)
1 Like

Thanks @KeithBauerANZ that's a very good point! I guess we don't even need the NonSendable class here to expose this issue, as long as in Actor B we have publisher.sink { ... } then the code inside this sink could access isolated properties of B from different threads/actors depending on who publishes to the subject backing the publisher.

What would be a better solution?

Combine won't ever be safe with swift concurrency; the question is what you want to do about that?

And the problems are twofold —

  • warnings where "conceptually" there shouldn't be some, like sending a publisher to another thread, which you're supposed to do, it just isn't safe.
  • no warnings where "conceptually" there should be, like sink's closure being non-@Sendable, when it usually is sent.

In case it wasn't clear from the above, you don't even need to send a Publisher to get into trouble:

// thread/task/actor A
// publish some A-isolated state
let subject = PassthroughSubject<NonSendable, Never>()
let publisher: AnyPublisher<Int, Never> = subject
    .receive(on: DispatchQueue.global())
    .sink {
        // access A-isolated state on the global queue
        $0.value
    }
    .store(in: &cancellables)

Here the publisher doesn't leave A, but the sink is run elsewhere.

1 Like

I've kinda tried to avoid answering the question directly, because I don't think there's a clear-cut answer to your problem.

I believe strongly that the long-term answer is to replace Combine with AsyncSequence. But in the short-term, you probably have a mountain of Combine code you need to live with, and even with minimal concurrency checking in Swift 6's Swift 5 language mode, you're probably getting warnings that aren't helping you.

I really think it's up to you how to deal with them —

  • @preconcurrency import, if it works for you,
  • adding retroactive Sendable conformances, even though they're not strictly true, 'cos you still have to use the types as if they are,
  • leaving the warnings as signposts to dangerous code, so you know where to start looking when things explode, or when you're ready to start migrating off Combine
1 Like

thanks a lot @KeithBauerANZ yeah I think that's a very good summary:
(1) short-term, @preconcurrency import Combine
(2) long-term, migrated Combine to AsyncSequence

@KeithBauerANZ you convinced me yesterday on the need to switch to AsyncSequence, but I'm revisiting this a little bit and had some second thoughts.

While AsyncSequence sounds nice in theory, there are still many unsolved problems:
(1) AsyncStream doesn't allow multiple observers, so it can't provide the same functionality as CurrentValueSubject or PassthroughSubject: 'Consuming an AsyncStream from multiple Tasks' Redux - #2 by FranzBusch or Swift Async Algorithms Proposal: Broadcast (Previously Shared) (many posts and many PRs not none got merged)
(2) lack of flatMapLatest/switchToLatest, it was proposed but not officially added: PrePitch: Introduce `(flat)mapLatest` Algorithms
(3) lack of the equivalent of .store(in: &cancellables)
there might still be limitations I haven't thought about yet.

There might be working solutions to those but then haven't reached consensus to get into even apple/swift-async-algorithms yet (let alone built-in framework), so the confidence isn't very high.

Now looking back at the Combine unsafe issue from this thread, you are right that unchecked sendable subject or publisher causes this loophole:

,

But we can say that directly passing a strong reference of self into sink isn't right anyways, i.e. retain cycle & memory leak. Let's say we have some linter to prevent this, so self has to be captured before passing into sink, then Swift 6 compiler actually warns us about the issue as expected:

which forces us to change it to:


then it should be safe.

What do you think?

FWIW, you can probably find other issues with replacing Combine publishers with AsyncSequences:

  • If you use a Task to listen to this never-ending AsyncSequences, it's very easy to inadvertently capture self, as Task.init's closure can implicitly capture self (the closure is annotated with @_implicitSelfCapture, see the source here, reasoning for the decision in SE-0304).
  • Because of this, it's very easy to extend the lifetime of self for the duration of the task (forever, if dealing with a never-ending sequence), so handling task through something like store(in: &cancellables) is hard to get right, as the AnyCancellable's deinit may never be called.
  • Not only doesn't AsyncStream allow for multiple observers, but the AsyncSequence protocol in general doesn't guarantee that multiple observers are supported. So it's hard to know at a glance if a for await loop (which only requires conformance to AsyncSequence) is iterating over a type supporting multiple observers or not.

I realize this may not be useful to you short term, but IMHO the best approach to the Swift 6 + Combine problem is not to replace every Combine publisher with an AsyncSequence. Instead, it may be useful to ask the question: Is there something else in the language now to achieve the goal I originally used Combine's publishers for?

In my experience, one major reason pushing for heavy use of Combine (or RxSwift) in projects was to make UI react to changes in the data source. Currently (at least on Apple platforms) there's some better alternatives for this goal, like SwiftUI + @Observable (or even ObservableObject), that remove the need of manually handling streams of changes.

Also, to interface with streams of changes, SwiftUI has a nice .task { ... } modifier that is automatically cancelled (ending any for await loops started within it [EDIT: depends on the underlying type of the sequence, see post below]) when the view disappears, instead of when some object is deallocated like AnyCancellables do (which makes the issue with self capture in never-ending AsyncSequences a non-issue).

For non-UI things, this is trickier and completely unique to every project. Lack of an equivalent to .store(in: &cancellables) can perhaps be elided by manually storing a reference to never-ending Tasks and cancelling them based on an external signal (for example, when a service is terminated), which may be more a more robust solution than relying on self being deallocated.

Just my 2¢.

3 Likes

Small but important clarification: AsyncSequence on its own does not guarantee that iteration of a for await loops ends when the iterating task gets cancelled. The sequence's AsyncIterator should explicitly react to cancellation, e.g. by using withTaskCancellationHandler in its next() implementation.

Now, AsyncStream and AsyncThrowingStream do exactly this and thus end iteration promptly when the iterating task gets cancelled (Reference: stdlib source code). But if you're working with other AsyncSequences, you should check how they handle cancellation.

6 Likes

I agree with the general direction of thinking about alternative patterns for Combine as it will probably never support Concurrency.

But for certain use cases it’s also hard to replace. Mainly it’s cases where there is a broadcasting source (eg current user, current location, etc) and they are consumed by both UI and non-UI, and often involves some business logic not on main thread, and sometimes it’s necessary to combine the current value from multiple sources.

So while alternatives are explored, what do you think about the thread safety of using Combine with Swift 6 concurrency by relying on:

  1. Only mark read only publisher with sendable output as unchecked sendable.
  2. Use linter to require every closure passed to map, flatMap, sink, etc must capture self if self is used, which seems to mostly address the issue of leaking actor access to an unsafe thread.
    Did I miss something in the Playground experiment in my last post?

Edit:
I saw another loophole is nonsendable objects on the actor, even when they are captured, Swift compiler doesn't protect against the unsafe usage:


so we'd also need some other rules:
3. don't allow map/flatMap/sink etc to capture nonsendable types. Capturing self (which is sendable) and access their unsinkable property is fine, see errors below:

  1. also unowned self shouldn't be allowed

I agree with the others in this thread, "replace Combine with AsyncSequence" will not be 1:1; in particular you don't want an object to own Tasks that capture itself. And yes, subjects are tricky because the backpressure situation is much stricter than Combine.

But I very much doubt, even with a semantics-aware (rather than just syntax-aware) linter that you could make usage of Combine safe.

I’m also very interested around this topic. Currently AsyncStream + consuming Task is not a trivial topic on its own. Mixing Combine with concurrency neither good too. Sometimes I find (unfortunately) simpler to convert async/await to Publisher to stick to Combine world instead.

It’s also a bit disappointing that GitHub - apple/swift-async-algorithms: Async Algorithms for Swift has faced stagnation since 1.0.

I wish that Swift would make it as simple as Kotlin’s Flow, StateFlow … StateFlow

3 Likes