`async var` and explicit child-task cancellation

Was it ever discussed why structured concurrency only permits cancellation from the root task and if async var would be banned forever?
I have a perfect example of a theoretically structured algorithm which does not outlive the parent task, but I can only implement it using an unstructured task, because (a) I cannot spawn a child task, and step over it (async let comes close but it has to be async var) and (b) pick it up later for explicit non-root cancellation when needed.

func collectLatest<T>(
  _ next: () async throws -> T?, 
  consume: (T) async throws -> Void 
) async rethrows {
  if Task.isCancelled { return }

  async var task: Void? = nil
  while let element = try await next() {
    try await cancel task // strawman syntax; explicitly cancel the structured sub-task
    task = consume(element) // spawn next child task; only one will run at a time
  }

  // async child task cancellation and await is implicit on scope exit, but we
  // want to possibly rethrow its error
  try await cancel task
}

Again, I can implement it using unstructured tasks, but I also keep seeing people saying that one should try to avoid using unstructured tasks as much as possible. Well I'd love to fully utilize structured concurrency, but it seems to be a bit limited in that particular area.


While reading through past proposals and the new with[Throwing][Discarding]TaskGroup proposal I think it should be possible to model async var using the discarding task group as it allows to discard the child task eventually.

There are only two more puzzle pieces missing.

  1. The above algorithm is flawed as there's no way to immediately re-throw the error from the consume(element) child task, unless next emits and element or nil and we explicitly try to avoid for the error or Void result of the last child task.

  2. There's not explicit child task cancellation. I think the with[Throwing][Discarding]TaskGroup could also have a @discardableResult on addTask and return a simple Sendable wrapper, that provides very similar API as Task. It would have func cancel(), var isCancelled: Bool and possibly also value property (I'm not sure about the latter property though). Escaping those would be fine as the postcondition returning from with[Throwing][Discarding]TaskGroup would only make the wrapper return true from isCancelled and func cancel() being a no-op.


A second version with hypothetically extended withThrowingDiscardingTaskGroup:

func collectLatest<T>(
  _ next: () async throws -> T?, 
  consume: @escaping @Sendable (T) async throws -> Void 
) async rethrows {
  if Task.isCancelled { return }

  try await withThrowingDiscardingTaskGroup { group in

    var childTask: ChildTask<Void, any Error>? = nil

    while let element = try await next() {

      if group.isCancelled {
        break
      }

      childTask?.cancel()
      try await childTask?.value

      childTask = group.addTask { // NEW: returns a child task wrapper
        try await consume(element)
      }
    }

    childTask?.cancel()
    try await childTask?.value
  }
}

cc @lukasa @ktoso since you guys worked on the recent discarding task group implementation, maybe you could provide some feedback on this idea.

I've looked up the swift code for task groups and it looks like every time a child task is spawned there's a retuned NativeObject that gets discarded. Is that the object responsible for the child task, which could potentially be wrapped in a Task like API and be exposed / returned through the addTask method?


Especially with discarding task groups this makes a lot of sense to be able to explicitly control the cancellation of child tasks.


Here's an untested (couldn't get the snapshot to work) implementation using withThrownigDiscardingTaskGroup that still relies on an unstructured pseudo child task:

func collectLatest<T>(
  _ next: () async throws -> T?,
  consume: @escaping @Sendable (T) async throws -> Void
) async rethrows {
  if Task.isCancelled { return }

  try await withThrowingDiscardingTaskGroup { group in

    var task: Task<Void, any Error>? = nil

    while let element = try await next() {

      if group.isCancelled {
        break
      }

      task?.cancel()
      try await task?.value

      let newTask = Task.detached {
        try await consume(element)
      }
      task = newTask

      // Add to the current group as the cancellation should be immediately forwarded and in case
      // the task throws first, the error should be re-thrown asap.
      //
      // This child task will be discarded as soon as the inner operation terminates.
      // The termination of the previous task is guaranteed.
      group.addTask {
        try await withTaskCancellationHandler {
          try await newTask.value
        } onCancel: {
          newTask.cancel()
        }
      }
    }

    task?.cancel()
    try await task?.value
  }
}

I've made a new implementation using two ThrowingTaskGroups but I'm not sure if it's safe and stable, as I had to silence the compiler about the unavailability of sendability, which otherwise would have been implicit. However it was explicitly made unavailable by the stdlib so that we don't escape it. This example tries to demonstrate that it could be still used to implement interesting structured algorithms.

struct _UncheckedThrowingTaskGroupBox: @unchecked Sendable {
  let group: ThrowingTaskGroup<Void, any Error>
}

actor Handler {
  var box: _UncheckedThrowingTaskGroupBox? = nil

  init() {}

  func retain(_ box: _UncheckedThrowingTaskGroupBox) {
    self.box = box
  }

  func release() {
    box = nil
  }

  func cancelAndRelease() {
    box?.group.cancelAll()
    release()
  }
}

func collectLatest<T>(
  _ next: () async throws -> T?,
  consume: @escaping @Sendable (T) async throws -> Void
) async rethrows {
  if Task.isCancelled { return }

  try await withThrowingTaskGroup(of: Void.self) { group in

    let handler = Handler()

    while let element = try await next() {
      if group.isCancelled {
        break
      }

      await handler.cancelAndRelease()
      try await group.next()

      group.addTask {
        try await withThrowingTaskGroup(of: Void.self) { group in
          group.addTask {
            try await consume(element)
          }
          await handler.retain(_UncheckedThrowingTaskGroupBox(group: group))
          do {
            try await group.next()
            await handler.release()
          } catch {
            await handler.release()
            throw error
          }
        }
      }
    }

    await handler.cancelAndRelease()
    try await group.next()
  }
}


It's sort of hard to follow what you want here, but I can respond to the question of explicit child task cancellation. I can't think of a particular technical reason to forbid that: it just requires API surface and Swift Evolution work.

1 Like

What about a possible async var though?

I would imagine that when assigning a new async call to it, it will just implicitly cancel the previous one and await it at some point before existing from the surrounding sync scope.


I have another [Throwing]TaskGroup related question. Does the call to group.next() within the body closure guarantees that the pending task will be drained from the group and not leak?

I've changed the above implementation of collectLatest to use and capture an AsyncChannel which basically should send a cancellation signal into the inner task group to make it emit one result, which then will cancel only that inner group. In other words it's a technique to create a structured pseudo cancellable child-task.

func collectLatest<T>(
  _ next: () async throws -> T?,
  consume: @escaping @Sendable (T) async throws -> Void
) async rethrows {
  try await withThrowingTaskGroup(of: Void.self) { group in

    var channel: AsyncChannel<Never>? = nil

    func cancel() async throws {
      channel?.finish()
      channel = nil

      // drain the last child task
      try await group.next()
    }

    while let element = try await next() {
      try await cancel()

      let newChannel = AsyncChannel<Never>()

      group.addTask {
        try await withThrowingTaskGroup(of: Void.self) { group in

          group.addTask {
            for await _ in newChannel {}
          }

          group.addTask {
            try await consume(element)
          }

          try await group.next()

          newChannel.finish()
          group.cancelAll()
        }
      }

      channel = newChannel
    }

    try await cancel()
  }
}

Pending tasks are always drained from task groups: a task group always waits for all its children to complete.

I can't think of any reason to forbid it, but I'm not sure how useful it is.

1 Like

You just answered it with the previews statement actually. If we also had an ability to explicitly cancel an async let/var it would be possible to reuse the binding for a new child task. It's not really possible to express with async let. Using a task group works, but it gets a bit messy really fast.