Idea: AsyncQueue

While using the existing AsyncSequence features I always needed something like:

actor AsyncQueue<Element: Sendable>: AsyncSequence, Sendable {
    
    init(policy: Policy) { ... }
    
    nonisolated func enqueue(_ element: Element) { ... }
    
    enum Policy: Sendable {
        case unbounded
        ...
    }
    
}

We already have AsyncStream, which is great for mapping a callback based api to the async world - but it is awkward to use as a general purpose solution for sending values imperatively from a non async context to a buffered and shared async queue.

1 Like

Hi

It seems the equivalent to what would be a PassthroughSubject in the Combine world.

An implementation could be:

In this new repo terminology I guess it would be like a BroadcastChannel. I think it is a common need and there should be a proposal for such a mechanism. I submitted the idea to @Philippe_Hausler. If there is an interest I’d like to try to write a proposal about that.

1 Like

Semantically, PassthroughSubject and AsyncQueue differ in that AsyncQueue is buffered and PassthroughSubject is not.

I was referring to PassthroughSubject just to establish the parallel with Combine.

As the implementation I linked is using an AsyncStream under the hood, it is buffered. But it is not clear if the intent behind this repo is to rely on AsyncStream or come up with a different implementation.

To make this be PassthroughSubject++ (or CurrentValueSubject++) you'll need what @Philippe_Hausler designated in his earlier post as a Distributor. i.e. you need something that will will wait for downstream distribution before accepting the next value. That needs something along the lines of DispatchSemaphore with an embedded Continuation to allow the downstreams to dispatch in parallel.

Given that phillipe basically spelled out the plan in that post (with all the attached mumbo jumbo about future products, blah blah blah) and that there are several places in current release where the notion of a Reducer (aka a Monoid) is explicitly implemented, my expectation is that we'll see an implementation of that RSN. If not, it's actually pretty easy to write.

the ++ in the previous post indicates that, unlike Combine's subjects, this version will get explicit backpressure in the form of the buffering policy on the Distributor. I do still feel that the current implementation is not being fully principled on drawing the right distinctions between pull and push however. But maybe that's in the works too.

I think in general semaphores should be avoided in concurrency; instead I think in the case of distribution of values that when sending a value it will suspend and be resumed when all interested parties have read from the iteration. So it is less of a "subject" style behavior but perhaps more of what the intent of use for shared values are.

if you are looking at it as an actor, perhaps then AsyncChannel + buffer does precisely what you are looking for

But the send of AsyncChannel is async, right? Which makes sense as a channel, but not so much as a queue.

Being an actor that send method is going to be async when called externally (which I thought was intentional on your part).

oh you are right - actually what I meant was:

actor AsyncQueue<Element: Sendable>: AsyncSequence, Sendable {
    
    init(policy: Policy) { ... }
    
    nonisolated func enqueue(_ element: Element) { ... }
    
    enum Policy: Sendable {
        case unbounded
        ...
    }
    
}

That might be better off being a final class in that case. Which makes it then fall into the AsyncStream family. Which I think for what you are describing it can be accomplished with a pretty straightforward wrapper to adjust the ergonomics of AsyncStream for that use case.

I ask myself if AsyncQueue would be general enough to be worth adding to the package.

You are right, it is easy to build, even easier with Combine:

PassthroughSubject<SomeType, Never>().values.buffer(policy: .unbounded)

But maybe it would still be useful to add a separate type for it, since Combine is not present on all platforms and creating a wrapper around AsyncStream is easy but should not be necessary if something like AsyncQueue is a useful general building block (which I am not sure of).

Since I do mostly app development, I'm probably biased.

In general, we need to think carefully about what should be a block and what should be just a combination of the existing operators.

Precisely my intent. The construct I've been using looks like:

public actor Semaphore {
    var continuation: UnsafeContinuation<Void, Never>?
    var count: Int

    public init(continuation: UnsafeContinuation<Void, Never>, count: Int) {
        self.continuation = continuation
        self.count = count
    }

    public func decrement() {
        guard let continuation = continuation, count > 0 else {
            fatalError("Semaphore in inconsistent state")
        }
        count -= 1
        if count == 0 {
            self.continuation = .none
            continuation.resume()
        }
    }
}

called sort of like this:

await withUnsafeContinuation({ continuation in
            let semaphore = Semaphore(continuation: continuation, count: 2)
            downstream1.service.yield((semaphore, currentValue))
            downstream2.service.yield((semaphore, currentValue))
        })

Is this sort of what you meant? I admit choosing the name Semaphore for such an actor might be a bit triggering, but its a Swift Concurrency-like version of the idea.

I have found that this is useful. I'm using my version of PassthroughSubject to allow multiple downstream subscribers. Each of those downstreams needs to quickly pull the value and enqueue it locally. My thinking is to try to drive almost everything off of demand and lazy evaluation, and introduce push with limited queue sizes only explicitly in places where it needs it.

IIUC, do you mean you want an equivalent to Combine's CurrentValueSubject? That's definitely something I noticed that was missing from the package. If we had such an equivalent (I'll call it AsyncBufferChannel for now) it would let us implement an equivalent to @Published too (without the ObservableObject magic of course).

I personally want PassthroughSubject, CurrentValueSubject and a 3rd one that reduces on values. I need for this to handle multiple "subscribers" downstream and that's what the discussion on Distributor is about.