Combine's PassthroughSubject inside an actor, is it safe?

Hello, I tried to implement dependency for synchronizing some values in the dictionary and then send that data outside through the observed function. This observer is AsyncStream, but internally I am using PassthroughSubject, because when I tried to use pure AsyncStream, the observer (when you go off the screen where it is used) is cancelled, but when I return it never restart again and values are not received anymore. Even if I don’t cancel it, it still does not continue with receiving values. Anyway, that is the main reason why I used PassthroughSubject. So my implementation looks like this (it’s simplified only).

actor TestActor {
   private let subject = PassthroughSubject<[String: String], Never>()
   private var items: [String: String] = [:]

   func update(key: String, value: String) {
     items[key] = value
     subject.send(items)
   }

   func observe() -> AsyncStream<[String: String]> {
     subject
       .values
       .eraseToStream()
     }
   }
}

The subject’s values are converted to AsyncStream using eraseToStream() extension from swift-concurrency-extras/Sources/ConcurrencyExtras/AsyncStream.swift at main · pointfreeco/swift-concurrency-extras · GitHub.

Then I observe and update this actor from a few places in the app in a concurrency context.

let testActor = TestActor()

Task {
   await testActor.observe()
}

Task {
   await testActor.update(key: "key", value: "value")
}

I have enabled complete strict concurrency checking and I didn’t see any warning on that code. But still, I am afraid that this couldn’t be problematic because of some subject internal implementation, which even the actor can’t cover. Thanks for your help.

PS: If someone can tell me why the AsyncStream can't provide new values when I am switching screens, I will be grateful.

PassthroughSubject doesn't make it work as you expected with stream, creating stream each time does. SwiftUI cancels task (I assume you are using .task on views to start listening to stream) once view leaves the screen. Respectively, it cancels the stream. By migration to PassthroughSubject you now have it separated: subject lives on its own, and stream is created each time view appears, so when you go back it is not in cancelled state. If you'd create stream in the stream-only version, you'll keep updates going when you go back as well. So this works fine:

var body: some View {
    ContentView().task {
        let stream = await actor.stream()
        for await items in stream {
            self.items = items
        }
    }
}

with actor modified roughly to that:

actor TestActor {
    typealias Items = [String: String]
    typealias Stream = AsyncStream<Items>

    private var continuation: Stream.Continuation?
    private var items: [String: String] = [:]

    func stream() -> Stream {
        var continuation: Stream.Continuation?
        let stream = Stream { continuation = $0 }
        self.continuation = continuation
        return stream
    }
}

Now it will create new stream each time screen appears, keeping updates alive.

1 Like

I will try your solution and I hope it will work and I can get rid of PassthroughSubject. I will reply then.

Thanks for your help!

I worry the offered solution does not allow sharing, I mean every call of stream abandon previous instance and only last stream is updated (because continuation is overridden). I guess this is different behavior from solution with subject which allows sharing update across all streams. Or have I missed something?

1 Like

Well, yes and no, I think. You will have one view on the screen at the time anyway, so each one that appears will override previous, that has gone with cancelled stream, and pick up current state (as items saved in the actor anyway). So it should work fine for that case.

But there might be cases where you need to have simultaneous subscribers, that’s right. Yet nothing prevents you to support this by saving an array of continuations instead of single one (and don’t forget to listen to onTermination to remove cancelled).

A bit more of work than publisher, which I think is worth, as combining Combine and Concurrency feels a bit odd anyway, and we clearly on the road of gradually fade away of the framework.

Hi, I tried a similar solution (with an array of continuations) you provide and it works as expected. Thanks!

1 Like