I have searched for couple of hours on async sequence / stream (I am still new to the swift concurrency) and can not find anything that resembles a PassthroughSubject or CurrentValueSubject from Combine which I can easily bridge from imperative code (think delegates from CoreLocation, CoreBluetooth etc.).
Can someone point me to a documentation for such type if it exists or share some resource on how to implement one if available?
If I use the above examples I would want to achieve something like
for await location in locationService.locationsSequence {
print(location)
}
I don't think there's a real subject equivalent yet. There's some work going on in the AsyncAlgorithms package, but we don't have the full multicast capabilities of Combine. There's AsyncChannel (sp?) but the equivalent send(_:) function is async and isn't the fire-and-forget API that a Combine Subject permits.
One thing that I've been using as a stop gap is to wrap a Subject in an AsyncStream. This leans on Combine's multicasting capabilities while we wait for some enhanced multicasting capabilities.
Essentially, create a new new AsyncStream for each subscriber that wraps the subject you wish to multicast.
I would typically create a convenience along the lines of:
final class SomeClass {
private let someTypeSubject = PassthroughSubject<SomeType, Never>()
public var someTypeStream: AsyncStream<SomeType> {
AsyncStream { continuation in
let cancellable = self.someTypeSubject.sink { continuation.yield($0) }
continuation.onTermination = { continuation in
cancellable.cancel()
}
}
}
}
This example use of AsyncStream is great as it allows us to connect non-Sendable emission logic to an AsyncSequence which itself will be Sendable if the Element it emits is Sendable. For example in my personal use case I want to move some UI events generated by a PassthroughSubject into the background and Sendable realm.
There's one easy to miss catch though. The AsyncStream in the example is flawed as it will buffer the events sent by the PassthroughSubject. This is not what you'd expect from such Subject. To fix this issue one has to explicitly set a different AsyncStream<Element>.Continuation.BufferingPolicy from the default .unbound to .bufferingOldest(0). This will resume all awaiting calls to iterator.next() when you yield a new value, but it will discard / drop everything if there is no active await anywhere.
Here's a playground example:
import PlaygroundSupport
import Combine
let page = PlaygroundPage.current
page.needsIndefiniteExecution = true
func foo() async {
let subject = PassthroughSubject<Int, Never>()
let stream = AsyncStream(bufferingPolicy: .bufferingOldest(0)) { continuation in
let cancellable = subject.sink { signal in
continuation.yield(signal)
}
continuation.onTermination = { continuation in
cancellable.cancel()
}
}
var iterator = stream.makeAsyncIterator()
Task {
print("will wait", Date.now)
do {
try await Task.sleep(nanoseconds: 2_000_000_000)
} catch {
print(error)
}
print("will resume and send", Date.now)
subject.send(1)
subject.send(2)
print("will wait", Date.now)
do {
try await Task.sleep(nanoseconds: 4_000_000_000)
} catch {
print(error)
}
print("will resume and send", Date.now)
subject.send(3)
print("will wait", Date.now)
do {
try await Task.sleep(nanoseconds: 1_000_000_000)
} catch {
print(error)
}
print("will resume and send", Date.now)
subject.send(4)
}
print(await iterator.next() as Any)
do {
try await Task.sleep(nanoseconds: 2_000_000_000)
} catch {
print(error)
}
print(await iterator.next() as Any)
print(await iterator.next() as Any)
}
Task {
await foo()
print("finishing")
page.finishExecution()
}
With .unbound this example will print 1, 2, 3, but with .bufferingOldest(0) it will result in 1, 3, 4 with the expected miss for the value 2.
Just on a further note. cancellation is of non-Sendable type (at this moment) and under strict concurrency checking cannot be captured by continuation.onTermination. This is the issue I'm trying to solve right now.
Yes, this is absolutely true. In my use case it didn't matter too much as I was immediately iterating the AsyncStream but if you intend to hold on to it for any period of time, it's critical to consider the buffering policy. Thanks for highlighting this.
The other important note is that this doesn't make AsyncStream a 'multi-pass' AsyncSequence, a fresh AsyncStream is required for each new consumer to safely iterate the values from the Subject.
More generally, I'm really keen on seeing Subject shaped AsyncSequence type in the standard library, but my feeling is we're stuck on waiting for a resolution on typed throws and the primary associated type for AsyncSequence before this type and the full potential of AsyncSequence can be realised.
In my case that's fine as I only need a single stream instance to work with at a single place.
After toying around with the buffer policy and reading through the internal buffer implementation I just realized that with your example and a slight adjustment to .bufferingNewest(1) we can also wrap and mimic CurrentValueSubject as well.
I think that would work with your use case, as you're using it for a single consumer. But if someone wishes to share the Subject amongst multiple consumers, they'd need to create a new AsyncStream, and therefore a new buffer, for each consumer, so it may not work quite as expected in that case.
True, I also run into some tight corners as I need to consume the values and then perform typical async transformations like merge. Since I had to operate on the AsyncIterator level for my needs, I'm out of luck moving back into the AsyncSequence realm. In my particular case I have inout AsyncIterator in so many places just to make sure that I'm mutating the same instance and not assuming that it's build with reference semantics even though that it is in several cases.
Totally agree. I'm hopeful once we see resolution on typed throws and primary associated types, we'll see some forward progress. Until then though, my feeling is that we're severely hamstrung on what we can do to get it to parity with other reactive frameworks.
Either of these provide comprehensive support to the core Async/Await functionality that provides a nice way to work with them like reactive streams.
I would suggest to review your requirements and dig into both libraries. They both supported, although AsyncExtensions has a more recent release. However The asynchrone libraries seem more comprehensive.
Unfortunately AsyncChannel doesn't seem to support multi casting - would be great to see that added so I can fully replace Combine in my apps.
i.e. the following only prints "Hello!" once.
let channel = AsyncChannel<String>()
Task {
for await element in channel {
print(element)
}
}
Task {
for await element in channel {
print(element)
}
}
await channel.send("Hello!")
Would love to see an official type in the swift-async-algorithms package that could replace Combine's current value subject! (and perhaps an AsyncStream version of @Published to go with it )
Would someone of the maintainers of AsyncAlgorithms mind to give a status update on those PRs that are tagged v1.1? Some of them have been lying around for quite some time already. What are those waiting on?
The broadcasting mentioned in this thread is essential to get more Concurrency coverage, in my opinion.
Thanks.