Why `AsyncStream` breaks structured concurrency?

I thought with structured concurrency if you cancel parent task all child tasks must be cancelled automatically. Turns out if you have a task which runs in the AsyncStream continuation block it's not the case.

Consider the following code. When I cancel outerTask the for await loop successfully ends and stops printing "received" messages. But the while !Task.isCancelled block in the continuation keeps running forever and "Cancelled!" is never printed. What's happening there? Why cancellation is not propagated?

func makeStream() -> AsyncStream<Int> {
    return AsyncStream { continuation in
        Task {
            var n = 0
            while !Task.isCancelled {
                print("Sending", n)
                continuation.yield(n)
                try? await Task.sleep(nanoseconds: 1_000_000_000)
                n += 1
            }
            print("Cancelled!")
        }
    }
}

let outerTask = Task {
    for await n in makeStream() {
        print("Received", n)
    }
}

try? await Task.sleep(nanoseconds: 3_500_000_000)
outerTask.cancel()
try? await Task.sleep(nanoseconds: 5_000_000_000)
1 Like

Hello @alex.vasenin,

The Task in the stream is not attached to the outer task. This is more visible when you rewrite the code as below:

let stream = makeStream()

let task = Task {
    for await n in stream {
        print("Received", n)
    }
}

So it is normal that the cancellation of the outer task does not propagate to the inner one.

I think that your solution is to look at the result of continuation.yield: when it is .terminated, you can stop the inner task. (I didn't run any code to check my claim, but I really think this is the way to go).

6 Likes

That's correct. Cancelling a parent task will automatically cancel child tasks.

if you have a task which runs in the AsyncStream continuation block [cancellation does not propagate]

Your code in the AsyncStream closure does not spawn a child task. The Task {} API creates an unstructured Task. Unstructured tasks do not automatically inherit cancellation from whomever created them.

While this might not be a solution for your problem I'd like to point out the proper terminology and behavior because this seems to be a constant source of confusion regarding Swift concurrency.

Child Tasks can only be spawned from an async context using async let or task groups. Their common behavior is that child tasks have to terminate before the spawning function returns. Cancellation is inherited from the parent task, as well as task priority and task local values.

Unstructured Tasks on the other hand can be spawned from everywhere, using Task {} or Task.detached {}. The spawning function can return before the unstructured Task finishes (that's why they are called unstructured). Cancellation is not automatically propagated. Task.detached {} also resets priority and actor context.

While the Xcode documentation for Task is somewhat confusing @ole pointed out that this WWDC talk gives a good overview:

19 Likes

Another option could be to set the continuation.onTermination closure, which will run immediately when the parent Task is cancelled. So in this example, you could use

func makeStream() -> AsyncStream<Int> {
    return AsyncStream { continuation in
        let task = Task {
            var n = 0
            while !Task.isCancelled {
                print("Sending", n)
                continuation.yield(n)
                try? await Task.sleep(nanoseconds: 1_000_000_000)
                n += 1
            }
            print("Cancelled!")
        }

        continuation.onTermination = { _ in
            task.cancel()
        }
    }
}

18 Likes

Even better ! :+1: Bookmarked because I now have some existing code to improve :)

1 Like

While this setup works it is still unstructured. I would recommend to use the makeStream(of:) factory method on AsyncStream and then pass the continuation into one child task that produces the elements and the stream into another child task that consumes the events.

I know that there are scenarios where this is hard to setup but I would recommend trying to avoid as many unstructured tasks as possible.

4 Likes

I honestly think it might have been a mistake to make Task's initializer @discardableResult.

6 Likes

How would that have helped here? Personally I think Task is poorly named, and the concurrency features as a whole are very opaque in regards to whether things are structured or unstructured, but I don't see how forcing the user to do _ = Task {} to get an async context would've helped.

1 Like

That's the thing; I think having to say _ = Task { } enforces that there's a handle here that gives you much more control over the execution of a task.

To give a basic example,

func textDidChange(to newText: String) {
  Task {
    await someLongRunningOperation(using: text)
  }
}

It's not obvious here that these tasks could pile up uncontrolled. However...

func textDidChange(to newText: String) {
  _ = Task {
    await someLongRunningOperation(using: text)
  }
}

...to me, this makes it more obvious that you could have control over this task if you wanted to, and would lead me to do something like this instead.

var longRunningTask: Task<Void, Never>?

func textDidChange(to newText: String) {
  longRunningTask?.cancel()
  longRunningTask = Task {
    await someLongRunningOperation(using: text)
  }
}

I fear this is getting a little off-topic, though.

7 Likes

Ah-ha! Turns out my assumption that only Task.detached { ... } creates unstructured task was wrong, as Task { ... } also does it, albeit keeping actor and priority.

I really like this solution - this is exactly what I was looking for :+1:

Thank you all for your replies, they were very helpful!

4 Likes