AsyncSequence algorithms + structured Concurrency

Hello everyone,

Over the past few months, I have implemented my fair share of root AsyncSequences and async algorithms for various transformations. One thing that I had to do a bunch of times, which felt wrong to me, was using unstructured Concurrency via Task{} to implement some of these algorithms. For example, the debounce algorithm needs to race a Task.sleep() against the upstream AsyncSequences production of elements. Once the Task.sleep() finished we need to forward the latest element to the consumer of debounce; however, we cannot just cancel the call to the upstream’s next() method since we need to buffer that element for a potential next call to next().
Due to these requirement what I have resorted to is spawning an unstructured Task{} on the first call to next() which sets up a task group where one child task consumes the upstream and another calls Task.sleep.

This works great; however, it requires the use of unstructured Concurrency which just feels wrong to me. In my head, we should be able to solve this with structured Concurrency. So we are either lacking something in the language or in the AsyncSequence protocol definition.

Some places where we spawn Tasks{} in async-algos:
debounce
merge

Potential solution

(The next paragraph are just my current thoughts. There probably is a better solution but this is where my current thought process is at)

I have thought about solving this a bit and one thing that I think we need is to have a separate child task driving (running) a chain of AsyncSeuences and a separate child task consuming the produced elements. I threw some code together quickly that extends the AsyncSequence protocol (This code has some obvious problems. I just wanna use it to kick off the discussion):

protocol AsyncSequence {
    associatedtype Element where Self.Element == Self.AsyncIterator.Element
    associatedtype AsyncIterator: AsyncIteratorProtocol 
    // Every `AsyncSequence` gets a new executor
    // (Naming is bad here. Executor is an overloaded term)
    associatedtype AsyncExecutor: AsyncExecutorProtocol
 
    func makeAsyncIteratorAndExecutor() -> (AsyncIterator, AsyncExecutor?)
}

protocol AsyncExecutorProtocol {
    // This methods needs to be called to run the whole chain.
    // Algorithms would setup their child tasks and call the upstream `run()` method.
    // Root `AsyncSequence`s probably have an empty implementation of this most of the times
    func run() async
}

A potential consumption pattern could look like this:

var (iterator, executor) = someSequence.debounce(for: .seconds(1)).makeAsyncIteratorAndExecutor()

await withTaskGroup(of: Void.self) { group in
    group.addTask { [take executor] in
        await executor.run()
    }
    group.addTask { [take iterator] in
        while let element = await iterator.next() {}
    }
}

This is using some hypothetical things like take in capture lists; however, it should bring the point across that there is something running the chain and another thing consuming it. The language could hide these construct from you and give you a nice for await in pattern.

I would love to get more opinions on all of this!
cc @Philippe_Hausler @ktoso

8 Likes

In the design process for AsyncSequence we took painstakingly detailed hints from not just the protocol hierarchy but also the code-gen for iteration. Your proposed change would not only be both ABI and API breaking but also would break that symmetry. How would you propose to have it code-gen? E.g. what would the for-await-in syntax emit? It seems to me there are some missing language model systems and this very well could be one of them.

Spawning tasks for work to be done isn’t a bad thing (provided it is not done too often) and I think perhaps a number of them could very well be smaller footprints (think child tasks).

I guess I would like to explore more on what the problem area is more so than just a gut feeling that something looks off. Many of these cases really do have a conceptual discrete task of work to be done in parallel to existing async work; if that is spelled by borrowing an executor or spelled by a specialized async call to create a single child task I think would be dependent upon exactly what is needed to be solved.

2 Likes

I do agree with @FranzBusch that this isn't quite a statement that we need an unstructured tasks. Structured tasks also work in parallel to existing work. The thing that a structured task has is a well-defined relationship to its parent, as well as a well-defined lifetime in terms of its parent. This strikes me as exactly right for the async algorithms case. For example, debounce should plausibly only require subtasks of the task that is iterating the AsyncIterator: if that Task is cancelled we shouldn't need the debounce task, nor should we need to do custom error propagation.

I think the real alarm in the implementation of debounce is that we appear to be manually structuring an unstructured Task: we create a Task, which we then have to manually propagate cancellation into. That strongly suggests that this Task is actually structured, and we've just not worked out how to express that relationship yet.

6 Likes

I know that my proposed code is breaking everything. The intention was just to get some discussion going

I don’t agree with this. All of the algorithms that I have implemented that currently spawn an unstructured Task don’t need work to happen in parallel. Rather what they truly need is some driving them that is parallel to the consumption.

Furthermore you can implement algorithms like debounce or merge with task groups without problems. The only reason why we can’t do it as an algo is because of the interface of AsyncSequence.

I strongly agree with this. We have basically reinvented child tasks without the compiler enforcing the rules. This also makes it slightly strange how cancellation is happening since in some cases we have to rely on the deinit of the iterator to cancel the unstructured task.

I would love to see some more ideas how to solve this problem in a fully structured way.

Perhaps what you are talking about is the concept of generators (a cousin of continuations that was considered early on in AsyncSequence but disregarded because it would have been a much bigger feature/work)

While you say that generators were discarded, can those still become a thing? Currently writing some streams is really not ergonomic.

// today
func foo() -> AsyncThrowingStream<Value, Error> {
  AsyncThrowingStream { continuation in
    Task {
      do { 
        continuation.yield(await getValue1())
        continuation.yield(value2)
        if condition {
          throw someError
        }
        continuation.yield(value3)
        continuation.finish()
      } catch {
        continuation.finish(throwing: error)
      }
    }
  }
}

// future
func foo() async throws -> async stream Value {
  yield await getValue1()
  yield value2
  if condition {
    throw someError
  }
  yield value3
}

They are not impossible, it was just at the point at which we were working there were some lacking features and development we needed (one component was move-only, which is coming to fruition).

I agree that the syntax you are laying out is considerably simpler but there are a lot of details we need to work out; both for synchronous generators and asynchronous generators.

I think however the analog you are really meaning is:

func foo() -> async throws stream Value {
  yield await getValue1()
  yield value2
  if condition {
    yield throw someError
  }
  yield value3
}
3 Likes

Just so that I clearly understand you. Are you saying they are not possible right now (which is fine) or at all / ever?

I don't know much about generators. I just posted the above example on slack recently and was told that it looked like generators which seemed to be positively received under the language devs but without any concrete plans. To me that example looks like a "simple" syntax sugar over AsyncThrowingStream (assuming we get typed throws one day).

@FranzBusch It is interesting that the task in that AsyncThrowingStream is almost (if not perhaps intended to be precisely) the same as the debounce case.

I am saying they were not possible previously due to missing stuff; but in the future I could see it to be a potential possibility - we left the option on the table to do it eventually if it made sense. I am not fully certain without doing a lot more research if we have all of the tools we need to build that.

2 Likes

I'm fine with "maybe possible in the future".

Is it? For me the example of @DevAndArtist looks more like having some task generating values while another is consuming them which can be modeled with structured concurrency.

In general, I don’t see the problem with root AsyncSequences currently but rather with the algos that want to have work that lives longer than the current call to next but is still bound to the consuming task

Would this algorithm be of any use, @DevAndArtist ?

Another type of Task relationship that is difficult/impossible to express with structured concurrency, is one where a Task may initiate a child-Task to be potentially shared amongst sibling Tasks. The original Task and its siblings may come and go in any order and be of any priority. Maybe this is easier to imagine as an inverse Task tree.

Currently, as it needs to be shared, the 'child' Task must actually be detached. Then, as its detached its priority needs to be decided upfront, whereas ideally it would probably adopt the priority of the parent with the highest-priority – which could change over time.

An example of this relationship would be a multicast asynchronous sequence, where multiple consumers are dependent on one producer.

1 Like