We were working on a small web crawler (just a toy implementation to explore the new concurrency system). Our design is roughly like this: we have a single job queue containing all the URLs that still need to be crawled. We have N workers, created using N child tasks. Each worker has a loop in which it tries to fetch the next URL from the queue (e.g. using
queue.dequeue()). However, it might be the case that the queue has fewer items than there are workers (for example, initially there is just one URL in the queue). In this case, we want to suspend the worker.
We've implemented this suspension using
dequeue, when the queue is empty, we use
withCheckedContinuation and store off the continuation. We can then await and call
dequeue recursively. When items are added to the queue, we resume all stored continuations.
By the way, we can't model our queue as an async stream, because we want multiple workers to get the next element (async stream only supports a single task as the consumer).
It feels a bit "wrong" to have to use withCheckedContinuation to implement this suspension behavior, is there a simpler way? I think the ideas from Communicating between two concurrent tasks don't apply as we have multiple workers in different tasks.
Here's the full code, please run this against a local URL, as it does not wait between fetching pages or back off when something goes wrong.