Hello everyone,
Over the past few months, I have implemented my fair share of root AsyncSequence
s and async algorithms for various transformations. One thing that I had to do a bunch of times, which felt wrong to me, was using unstructured Concurrency via Task{}
to implement some of these algorithms. For example, the debounce
algorithm needs to race a Task.sleep()
against the upstream AsyncSequence
s production of elements. Once the Task.sleep()
finished we need to forward the latest element to the consumer of debounce
; however, we cannot just cancel the call to the upstream’s next()
method since we need to buffer that element for a potential next call to next()
.
Due to these requirement what I have resorted to is spawning an unstructured Task{}
on the first call to next()
which sets up a task group where one child task consumes the upstream and another calls Task.sleep
.
This works great; however, it requires the use of unstructured Concurrency which just feels wrong to me. In my head, we should be able to solve this with structured Concurrency. So we are either lacking something in the language or in the AsyncSequence
protocol definition.
Some places where we spawn Tasks{}
in async-algos:
debounce
merge
Potential solution
(The next paragraph are just my current thoughts. There probably is a better solution but this is where my current thought process is at)
I have thought about solving this a bit and one thing that I think we need is to have a separate child task driving (running) a chain of AsyncSeuence
s and a separate child task consuming the produced elements. I threw some code together quickly that extends the AsyncSequence
protocol (This code has some obvious problems. I just wanna use it to kick off the discussion):
protocol AsyncSequence {
associatedtype Element where Self.Element == Self.AsyncIterator.Element
associatedtype AsyncIterator: AsyncIteratorProtocol
// Every `AsyncSequence` gets a new executor
// (Naming is bad here. Executor is an overloaded term)
associatedtype AsyncExecutor: AsyncExecutorProtocol
func makeAsyncIteratorAndExecutor() -> (AsyncIterator, AsyncExecutor?)
}
protocol AsyncExecutorProtocol {
// This methods needs to be called to run the whole chain.
// Algorithms would setup their child tasks and call the upstream `run()` method.
// Root `AsyncSequence`s probably have an empty implementation of this most of the times
func run() async
}
A potential consumption pattern could look like this:
var (iterator, executor) = someSequence.debounce(for: .seconds(1)).makeAsyncIteratorAndExecutor()
await withTaskGroup(of: Void.self) { group in
group.addTask { [take executor] in
await executor.run()
}
group.addTask { [take iterator] in
while let element = await iterator.next() {}
}
}
This is using some hypothetical things like take
in capture lists; however, it should bring the point across that there is something running the chain and another thing consuming it. The language could hide these construct from you and give you a nice for await in
pattern.
I would love to get more opinions on all of this!
cc @Philippe_Hausler @ktoso