Simple multiple consumer model using structured concurrency?

Before structured concurrency, I would write a loop that started N worker “threads”, where each thread simply looped forever, reading from a deque (the “worklist”) for its next data item to be processed. Since the worklist might be empty, the worker threads used a semaphore to know when it was time to actually look at the deque, to get the next work item. The worker threads obviously spent time doing nothing, while blocked on the semaphore.

whenever the rest of program wanted to add to the worklist, it added a new item to the worklist and then called signal() on the semaphore. (The worklist itself was threadsafe. The point of the semaphore was to know when it was worth looking at the worklist.)

I cannot figure out what the equivalent pattern using structured concurrency should be. The semaphore model requires not a lot of code, so I would expect the replacement to be equally simple. What direction should I be going in?

1 Like

Tasks are run on a thread pool, so you shouldn't need to write your own when using it.

Sorry, but that’s not really an answer. What is a good candidate for accumulating work to be done, that is executed by a set number of asynchronous tasks, given that the work to be done is not known a priori? I.e. over the lifetime of the program, new work items will be generated, and those items need to be executed by asynchoronous means, with a fixed upper limit of how many can be running at a given time.

Describe the mechanism by which item of work is enqueued, and the mechanism for “consuming” said items of work. Additionally, items should be begun to be processed in the order in which they were enqueued.

1 Like

I am not entirely sure I understand your question, can you please clarify the following:

What prevents you from doing something like this?

while true {
  // work is generated here:
  let workItem = ...
  Task.detached {
    // do the work in here

Under the hoods, as far as I understand, this will use grand central dispatch to do basically what you describe (it will put the task into a queue and some thread will pick it up and start working on it).

Imagine a program which receives events from an external source over time. This is the “generated” work, and so you can’t put that neatly (or even not-neatly :)) in the // work is generated here.

Each event received (say, over the network) creates a work item, and each work item wants to be processed in the background. For whatever reason, I am happy to be processing, say at most 4 work items in parallel, but never more than that. Additonally, if I have more than 4 work items ready to be processed, I am sad if I don’t have 4 items being processed in parallel by my program.

I don’t get what is so mysterious about this subject, that I can’t find clear explanations/examples. The producer/consumer model (with multiple consumers) has got to be one of the most celebrated examples of concurrency, ever. You don’t put the producing code in the consumer! THe consumer just reads the work to be done, without knowing where the heck it came from.

How in the world do we do this easily with Swift concurrency? Said differently, what is the equivalent of a condition variable or a semaphore with structured concurrency?

1 Like

Let me try this another way. Consider a datastructure, where I can pushright() an item into the structure, without blocking. Consider that this datastructure has another function popleft() that will return, in order, items pushed via pushright().

popleft() is async and won’t throw an error if there is nothing to pop. it will just wait, patiently.

Imagine that I’m going to have 4 Tasks, all calling popleft() repeatedly, untill the end of time.
Point me in the right direction to implement this datastructure.

Right, this pattern is quite common in C++. I am myself quite new to Swift so I might not know the "best" way of doing this in Swift.

One thing I can think of is AsyncStream, if I understand the docs correctly it allows you to synchronously write into the stream while asynchronously read.

If you want to go more low-level you can use a DispatchGroup as a semaphore. I have done this successfully in the past.

The next thing I can think of is to write your queue in C++ (you can call continuations in C++ code so you could implement this semantic).

One thing that I have found a bit frustrating with Swift is that, as of today, it doesn't give me the same amount of control over concurrency primitives as C++ does. For example: if you want to limit yourself to 4 threads, that's possible, but a bit annoying to do today.

This pattern is common in EVERY language and system I have used for the past 20 years, so I’m taken aback it’s not a piece of cake, here today, in new Swift concurrency land.

AsyncStream doesn’t allow multiple different people to be “reading” it, so it works for the special case of one consumer thread/task.

AsyncStream might be the answer if it allowed for a set number of readers.

I think you're making this harder than it is.

  • You want to limit the number of work items in progress at one time, so you will need a queue of pending work items.

  • Since concurrency is involved, you'll need an actor to maintain the queue safely.

  • When a work item arrives (or is created, or whatever), it's given to the actor which decides whether to queue it or start a task for it.

  • When a task finishes, it notifies the actor, which can decide whether to start another task for a new work item.

There's no particular reason for these tasks to loop or perform multiple work items. That's an artifact of a thread-based design, where you care about keeping existing threads around. Tasks are lighter-weight than threads, so (except in unusual cases) you can just create them on demand.

Note that the number of tasks doesn't tell you the number of concurrent operations. The "amount" of concurrency available depends on the system and what else it is doing.

The implementation here involves a count and an array (or other queue structure). That's about it.


That was true of AsyncStream originally, but AFAIK there's no restriction any more on waiting on an AsyncStream from multiple tasks.

However, I'm not sure it's especially helpful here, since you still have to count tasks yourself. You could certainly do that as part of the AsyncStream implementation, but I doubt that makes things any easier or more automatic, it just moves your counting code to a different place.

OK, this approach makes sense. Thank you. (The approach of an actor which either buffers the work or starts a task going if the number of running tasks is less than N.) I was missing that the actor was both the dispatching agent and the buffer at the same time.

If you can wait from multiple tasks, then you just start N tasks, each of which reads from the stream in a forever loop. That’s actually closest to the model I used to have.

It separates producer from consumer neatly: one agent pushes into the stream, some other number of agents pull from the stream.

No counting required. When the stream is empty, the N tasks are simply awaiting the next push.

may not be 100% of what you are asking, but I think you are dancing around the same kind of topic:

Actually, AsyncStream with the ability for multiple tasks to be reading from it at the same time is 100% of what I was looking for. I already had a similar structure, it’s just that I was using a DispatchSemaphore to block while waiting for more work.

With AsyncStream, the iteration is inherently blocking which does the exact same thing.
The final implementation becomes simple:

for i in 0…nTasks {
    Task {
        for await item in asyncStream {   // this is a "forever loop"
            process(item)                 // because nobody ever calls finish        
        }                                 // on the asyncsStream to terminate it.

A separate piece of code is responsible for pushing items onto asyncStream, in whatever fashion and whenever it makes sense. Note: in actual practice, since the iteration of over the stream by each task may go on an unbounded amount of time, an auto-release pool mechanism would surround each call to process.

In this scenario, nTasks is carefully chosen after study, and depending on the processor. For the sort of work I use it for, it is typically 2 or 4, i.e. nowhere near a value resulting in thread explosion. We deliberately want these to be separate threads in most cases because the work of process() may be CPU intense (though sometimes it is just network bounded).

With regular GCD, you use a concurrent dispatch queue and start up nTasks invocation of a function which loops, blocking on a semaphore, reading values from a dequeue. When it has no work, and is waiting on the semaphore, the thread remains, but presumably the processor switches in another thread to run. The Task based approach simply avoids extra dormant threads.

Regarding your solution: you'll want to use Task.detached instead, otherwise you'll be running your process(item) in the same concurrency context, which usually means the same thread. So, you have just a suboptimal runloop.

Judging by:

In this scenario, nTasks is carefully chosen after study, and depending on the processor.

I agree with @tgoyne, it seems you want to build a worker pool. That's already handled for you with Swift Concurrency.

If you have any other reasons for which you'd like to control the number of tasks that enter the pool, then the solution @QuinceyMorris came with is the idiomatic way to go. You could also theoretically write a custom executor, but as far as I know, custom executors are not available yet.

AsyncStreams were not designed for this.

If you only want to control the number of tasks so that you get the optimal performance for your CPU, just spawn as many tasks as you want with Task.detached and don't care about the consumers / executers. Accept that you don't control that part. The build-in executor will handle it.

Thanks for the feedback of using Task.detatched(). I'll check on that. I thought each spawned Task() could potentially run on a different thread, but you're saying that's not the case?

It’s not about letting the CPU pick the optimal performance. I don’t want more than a certain amount of workers running (because it will overload our backend servers if too many requests come in too fast), nor less than a certain amount (because we won’t process fast enough).

But the other part of the equation is that workers need to block until there is work to be done. I’m not getting a clear answer on what the mechanism for a worker blocking, while waiting for work, should be. Certainly having a producer put a unit of work into an AsyncStream is an easy way of doing this. But if for whatever reason that is a “bad” model, please explain clearly and simply
(a) what a “push” of work looks like
(b) what a “pop” of work (that starts computation going) is

I’m not getting a clear answer on what the mechanism for a worker blocking, while waiting for work, should be

I'm sorry, but you have a clear answer above, you just have to change your mindset: you're not thinking within the Swift Concurrency paradigm.

Swift Concurrency is designed specifically to avoid the kind of blocking behaviour you want to use. It's a non-blocking model, and in fact, mixing Swift Concurrency with a blocking model (e.g a semaphore) is an anti-pattern.

1 Like

What does my worker task await on, to obtain the work it should be doing, or to yield control when there is no work available?

I didn't say I was waiting on a semaphore. In fact the whole point was to remove the semaphores in the first place, and replace them with a call to await that either (a) immediately returns work to be done, allowing my task to be useful, or (b) yields control because there is nothing useful for the task to do at the current time, because no new work is available.

Perhaps the approach of storing data describing the work to be done is not within the Swift Concurrency paradigm?

Instead, as a process determines each unit of work to be done, should it immediately add a Task object to some group/collection/etc and let those Task objects accumulate and be dealt with by whatever scheduling mechanism seems appropriate?

I guess you could do it that way, but that requires the entity determining the data defining the work to also know exactly what code is to be executed in response to that work, so as to form the Task. And I would guess the memory required to represent a Task that has yet to run is a good bit bigger than, say, a 100 or 200 byte string that might describe for example the next network path/image/etc to be fetched.