Right now Combine publishers are not sendable, so we get warnings (or error if Swift 6 language mode is enabled) if we pass a publisher across actor boundaries.
Since AnyPublisher is can't update any data, so accessing it from different threads should be fine.
I saw Xcode suggested Add '@preconcurrency' to treat 'Sendable'-related errors from module 'Combine' as warnings but just adding unchecked to a smaller scope feels safer.
open class NonSendable {
var value: Int
init(value: Int) {
self.value = value
}
}
// thread/task/actor A
// publish some A-isolated state
let subject = PassthroughSubject<NonSendable, Never>()
let publisher: AnyPublisher<Int, Never> = subject
.buffer(1, prefetch: .keepFull, whenFull: .dropOldest)
.map { $0.value }
.eraseToAnyPublisher()
// send to thread/task/actor B
let notSendable = NotSendable(value: 0)
let cancellable = publisher
.sink {
// update some B-isolated state
notSendable.value = $0
}
Int is Sendable, so publisher would be Sendable with your extension
PassthroughSubject delivers elements synchronously as they're sent, so long as there's downstream demand
buffer creates upstream demand, but supplies elements only in response to downstream demand
map operates synchronously when elements are delivered
sink's closure is not marked @Sendable, so it can access B-isolated state
Now there are two situations:
Nothing sent through the subject yet. Creating the sink on B creates demand, which propagates through map to buffer, but there's no values.
Now A publishes a NonSendable value
which subject delivers synchronously to buffer
buffer delivers synchronously to map
map calls .value (safe! we're on A) and delivers it synchronously to sink
sink updates B-isolated state (unsafe! we're still on A)
Before publisher gets sent (so there's no sink and no demand)
A publishes a NonSendable value
which subject delivers synchronously to buffer
which buffer holds, until there's downstream demand
creating the sink on B creates demand, which propagates through map to buffer
buffer synchronously delivers the buffered value to map
map calls .value (unsafe! we're on B) and delivers it synchronously to sink
sink updates B-isolated state (safe! we're still on B)
Thanks @KeithBauerANZ that's a very good point! I guess we don't even need the NonSendable class here to expose this issue, as long as in Actor B we have publisher.sink { ... } then the code inside this sink could access isolated properties of B from different threads/actors depending on who publishes to the subject backing the publisher.
In case it wasn't clear from the above, you don't even need to send a Publisher to get into trouble:
// thread/task/actor A
// publish some A-isolated state
let subject = PassthroughSubject<NonSendable, Never>()
let publisher: AnyPublisher<Int, Never> = subject
.receive(on: DispatchQueue.global())
.sink {
// access A-isolated state on the global queue
$0.value
}
.store(in: &cancellables)
Here the publisher doesn't leave A, but the sink is run elsewhere.
I've kinda tried to avoid answering the question directly, because I don't think there's a clear-cut answer to your problem.
I believe strongly that the long-term answer is to replace Combine with AsyncSequence. But in the short-term, you probably have a mountain of Combine code you need to live with, and even with minimal concurrency checking in Swift 6's Swift 5 language mode, you're probably getting warnings that aren't helping you.
I really think it's up to you how to deal with them —
@preconcurrency import, if it works for you,
adding retroactive Sendable conformances, even though they're not strictly true, 'cos you still have to use the types as if they are,
leaving the warnings as signposts to dangerous code, so you know where to start looking when things explode, or when you're ready to start migrating off Combine
thanks a lot @KeithBauerANZ yeah I think that's a very good summary:
(1) short-term, @preconcurrency import Combine
(2) long-term, migrated Combine to AsyncSequence
There might be working solutions to those but then haven't reached consensus to get into even apple/swift-async-algorithms yet (let alone built-in framework), so the confidence isn't very high.
Now looking back at the Combine unsafe issue from this thread, you are right that unchecked sendable subject or publisher causes this loophole:
But we can say that directly passing a strong reference of self into sink isn't right anyways, i.e. retain cycle & memory leak. Let's say we have some linter to prevent this, so self has to be captured before passing into sink, then Swift 6 compiler actually warns us about the issue as expected:
FWIW, you can probably find other issues with replacing Combine publishers with AsyncSequences:
If you use a Task to listen to this never-ending AsyncSequences, it's very easy to inadvertently capture self, as Task.init's closure can implicitly capture self (the closure is annotated with @_implicitSelfCapture, see the source here, reasoning for the decision in SE-0304).
Because of this, it's very easy to extend the lifetime of self for the duration of the task (forever, if dealing with a never-ending sequence), so handling task through something like store(in: &cancellables) is hard to get right, as the AnyCancellable's deinit may never be called.
Not only doesn't AsyncStream allow for multiple observers, but the AsyncSequence protocol in general doesn't guarantee that multiple observers are supported. So it's hard to know at a glance if a for await loop (which only requires conformance to AsyncSequence) is iterating over a type supporting multiple observers or not.
I realize this may not be useful to you short term, but IMHO the best approach to the Swift 6 + Combine problem is not to replace every Combine publisher with an AsyncSequence. Instead, it may be useful to ask the question: Is there something else in the language now to achieve the goal I originally used Combine's publishers for?
In my experience, one major reason pushing for heavy use of Combine (or RxSwift) in projects was to make UI react to changes in the data source. Currently (at least on Apple platforms) there's some better alternatives for this goal, like SwiftUI + @Observable (or even ObservableObject), that remove the need of manually handling streams of changes.
Also, to interface with streams of changes, SwiftUI has a nice .task { ... } modifier that is automatically cancelled (ending any for await loops started within it [EDIT: depends on the underlying type of the sequence, see post below]) when the view disappears, instead of when some object is deallocated like AnyCancellables do (which makes the issue with self capture in never-ending AsyncSequences a non-issue).
For non-UI things, this is trickier and completely unique to every project. Lack of an equivalent to .store(in: &cancellables) can perhaps be elided by manually storing a reference to never-ending Tasks and cancelling them based on an external signal (for example, when a service is terminated), which may be more a more robust solution than relying on self being deallocated.
Small but important clarification: AsyncSequence on its own does not guarantee that iteration of a for await loops ends when the iterating task gets cancelled. The sequence's AsyncIterator should explicitly react to cancellation, e.g. by using withTaskCancellationHandler in its next() implementation.
Now, AsyncStream and AsyncThrowingStream do exactly this and thus end iteration promptly when the iterating task gets cancelled (Reference: stdlib source code). But if you're working with other AsyncSequences, you should check how they handle cancellation.
I agree with the general direction of thinking about alternative patterns for Combine as it will probably never support Concurrency.
But for certain use cases it’s also hard to replace. Mainly it’s cases where there is a broadcasting source (eg current user, current location, etc) and they are consumed by both UI and non-UI, and often involves some business logic not on main thread, and sometimes it’s necessary to combine the current value from multiple sources.
So while alternatives are explored, what do you think about the thread safety of using Combine with Swift 6 concurrency by relying on:
Only mark read only publisher with sendable output as unchecked sendable.
Use linter to require every closure passed to map, flatMap, sink, etc must capture self if self is used, which seems to mostly address the issue of leaking actor access to an unsafe thread.
Did I miss something in the Playground experiment in my last post?
Edit:
I saw another loophole is nonsendable objects on the actor, even when they are captured, Swift compiler doesn't protect against the unsafe usage:
so we'd also need some other rules:
3. don't allow map/flatMap/sink etc to capture nonsendable types. Capturing self (which is sendable) and access their unsinkable property is fine, see errors below:
I agree with the others in this thread, "replace Combine with AsyncSequence" will not be 1:1; in particular you don't want an object to own Tasks that capture itself. And yes, subjects are tricky because the backpressure situation is much stricter than Combine.
But I very much doubt, even with a semantics-aware (rather than just syntax-aware) linter that you could make usage of Combine safe.
I’m also very interested around this topic. Currently AsyncStream + consuming Task is not a trivial topic on its own. Mixing Combine with concurrency neither good too. Sometimes I find (unfortunately) simpler to convert async/await to Publisher to stick to Combine world instead.