We already have AsyncStream, which is great for mapping a callback based api to the async world - but it is awkward to use as a general purpose solution for sending values imperatively from a non async context to a buffered and shared async queue.
It seems the equivalent to what would be a PassthroughSubject in the Combine world.
An implementation could be:
In this new repo terminology I guess it would be like a BroadcastChannel. I think it is a common need and there should be a proposal for such a mechanism. I submitted the idea to @Philippe_Hausler. If there is an interest Iād like to try to write a proposal about that.
I was referring to PassthroughSubject just to establish the parallel with Combine.
As the implementation I linked is using an AsyncStream under the hood, it is buffered. But it is not clear if the intent behind this repo is to rely on AsyncStream or come up with a different implementation.
To make this be PassthroughSubject++ (or CurrentValueSubject++) you'll need what @Philippe_Hausler designated in his earlier post as a Distributor. i.e. you need something that will will wait for downstream distribution before accepting the next value. That needs something along the lines of DispatchSemaphore with an embedded Continuation to allow the downstreams to dispatch in parallel.
Given that phillipe basically spelled out the plan in that post (with all the attached mumbo jumbo about future products, blah blah blah) and that there are several places in current release where the notion of a Reducer (aka a Monoid) is explicitly implemented, my expectation is that we'll see an implementation of that RSN. If not, it's actually pretty easy to write.
the ++ in the previous post indicates that, unlike Combine's subjects, this version will get explicit backpressure in the form of the buffering policy on the Distributor. I do still feel that the current implementation is not being fully principled on drawing the right distinctions between pull and push however. But maybe that's in the works too.
I think in general semaphores should be avoided in concurrency; instead I think in the case of distribution of values that when sending a value it will suspend and be resumed when all interested parties have read from the iteration. So it is less of a "subject" style behavior but perhaps more of what the intent of use for shared values are.
if you are looking at it as an actor, perhaps then AsyncChannel + buffer does precisely what you are looking for
That might be better off being a final class in that case. Which makes it then fall into the AsyncStream family. Which I think for what you are describing it can be accomplished with a pretty straightforward wrapper to adjust the ergonomics of AsyncStream for that use case.
But maybe it would still be useful to add a separate type for it, since Combine is not present on all platforms and creating a wrapper around AsyncStream is easy but should not be necessary if something like AsyncQueue is a useful general building block (which I am not sure of).
Since I do mostly app development, I'm probably biased.
Precisely my intent. The construct I've been using looks like:
public actor Semaphore {
var continuation: UnsafeContinuation<Void, Never>?
var count: Int
public init(continuation: UnsafeContinuation<Void, Never>, count: Int) {
self.continuation = continuation
self.count = count
}
public func decrement() {
guard let continuation = continuation, count > 0 else {
fatalError("Semaphore in inconsistent state")
}
count -= 1
if count == 0 {
self.continuation = .none
continuation.resume()
}
}
}
called sort of like this:
await withUnsafeContinuation({ continuation in
let semaphore = Semaphore(continuation: continuation, count: 2)
downstream1.service.yield((semaphore, currentValue))
downstream2.service.yield((semaphore, currentValue))
})
Is this sort of what you meant? I admit choosing the name Semaphore for such an actor might be a bit triggering, but its a Swift Concurrency-like version of the idea.
I have found that this is useful. I'm using my version of PassthroughSubject to allow multiple downstream subscribers. Each of those downstreams needs to quickly pull the value and enqueue it locally. My thinking is to try to drive almost everything off of demand and lazy evaluation, and introduce push with limited queue sizes only explicitly in places where it needs it.
IIUC, do you mean you want an equivalent to Combine's CurrentValueSubject? That's definitely something I noticed that was missing from the package. If we had such an equivalent (I'll call it AsyncBufferChannel for now) it would let us implement an equivalent to @Published too (without the ObservableObject magic of course).
I personally want PassthroughSubject, CurrentValueSubject and a 3rd one that reduces on values. I need for this to handle multiple "subscribers" downstream and that's what the discussion on Distributor is about.