Why `AsyncStream` breaks structured concurrency?

I thought with structured concurrency if you cancel parent task all child tasks must be cancelled automatically. Turns out if you have a task which runs in the AsyncStream continuation block it's not the case.

Consider the following code. When I cancel outerTask the for await loop successfully ends and stops printing "received" messages. But the while !Task.isCancelled block in the continuation keeps running forever and "Cancelled!" is never printed. What's happening there? Why cancellation is not propagated?

func makeStream() -> AsyncStream<Int> {
    return AsyncStream { continuation in
        Task {
            var n = 0
            while !Task.isCancelled {
                print("Sending", n)
                continuation.yield(n)
                try? await Task.sleep(nanoseconds: 1_000_000_000)
                n += 1
            }
            print("Cancelled!")
        }
    }
}

let outerTask = Task {
    for await n in makeStream() {
        print("Received", n)
    }
}

try? await Task.sleep(nanoseconds: 3_500_000_000)
outerTask.cancel()
try? await Task.sleep(nanoseconds: 5_000_000_000)
1 Like

Hello @alex.vasenin,

The Task in the stream is not attached to the outer task. This is more visible when you rewrite the code as below:

let stream = makeStream()

let task = Task {
    for await n in stream {
        print("Received", n)
    }
}

So it is normal that the cancellation of the outer task does not propagate to the inner one.

I think that your solution is to look at the result of continuation.yield: when it is .terminated, you can stop the inner task. (I didn't run any code to check my claim, but I really think this is the way to go).

7 Likes

That's correct. Cancelling a parent task will automatically cancel child tasks.

if you have a task which runs in the AsyncStream continuation block [cancellation does not propagate]

Your code in the AsyncStream closure does not spawn a child task. The Task {} API creates an unstructured Task. Unstructured tasks do not automatically inherit cancellation from whomever created them.

While this might not be a solution for your problem I'd like to point out the proper terminology and behavior because this seems to be a constant source of confusion regarding Swift concurrency.

Child Tasks can only be spawned from an async context using async let or task groups. Their common behavior is that child tasks have to terminate before the spawning function returns. Cancellation is inherited from the parent task, as well as task priority and task local values.

Unstructured Tasks on the other hand can be spawned from everywhere, using Task {} or Task.detached {}. The spawning function can return before the unstructured Task finishes (that's why they are called unstructured). Cancellation is not automatically propagated. Task.detached {} also resets priority and actor context.

While the Xcode documentation for Task is somewhat confusing @ole pointed out that this WWDC talk gives a good overview:

21 Likes

Another option could be to set the continuation.onTermination closure, which will run immediately when the parent Task is cancelled. So in this example, you could use

func makeStream() -> AsyncStream<Int> {
    return AsyncStream { continuation in
        let task = Task {
            var n = 0
            while !Task.isCancelled {
                print("Sending", n)
                continuation.yield(n)
                try? await Task.sleep(nanoseconds: 1_000_000_000)
                n += 1
            }
            print("Cancelled!")
        }

        continuation.onTermination = { _ in
            task.cancel()
        }
    }
}

21 Likes

Even better ! :+1: Bookmarked because I now have some existing code to improve :)

2 Likes

While this setup works it is still unstructured. I would recommend to use the makeStream(of:) factory method on AsyncStream and then pass the continuation into one child task that produces the elements and the stream into another child task that consumes the events.

I know that there are scenarios where this is hard to setup but I would recommend trying to avoid as many unstructured tasks as possible.

6 Likes

I honestly think it might have been a mistake to make Task's initializer @discardableResult.

6 Likes

How would that have helped here? Personally I think Task is poorly named, and the concurrency features as a whole are very opaque in regards to whether things are structured or unstructured, but I don't see how forcing the user to do _ = Task {} to get an async context would've helped.

1 Like

That's the thing; I think having to say _ = Task { } enforces that there's a handle here that gives you much more control over the execution of a task.

To give a basic example,

func textDidChange(to newText: String) {
  Task {
    await someLongRunningOperation(using: text)
  }
}

It's not obvious here that these tasks could pile up uncontrolled. However...

func textDidChange(to newText: String) {
  _ = Task {
    await someLongRunningOperation(using: text)
  }
}

...to me, this makes it more obvious that you could have control over this task if you wanted to, and would lead me to do something like this instead.

var longRunningTask: Task<Void, Never>?

func textDidChange(to newText: String) {
  longRunningTask?.cancel()
  longRunningTask = Task {
    await someLongRunningOperation(using: text)
  }
}

I fear this is getting a little off-topic, though.

8 Likes

Ah-ha! Turns out my assumption that only Task.detached { ... } creates unstructured task was wrong, as Task { ... } also does it, albeit keeping actor and priority.

I really like this solution - this is exactly what I was looking for :+1:

Thank you all for your replies, they were very helpful!

4 Likes

Could you give an example or elaborate a bit more? Iā€™m probably missing something how makeStream(of:) could be an improvement over the @Wouter01 proposed solution?

Especially Iā€™m thinking about creating a generic initialisation helper that would allow to easily create a stream with ability to use async/await inside with proper cancellation/termination handling. Especially thinking about Kotlin's Flow - exactly like this Youre missing async sequence builder like kotlin's Ā· Issue #19 Ā· sideeffect-io/AsyncExtensions Ā· GitHub

1 Like

For the sake of future readers, I might offer a variation of @Wouter01 ā€™s answer; perhaps a more idiomatic solution would be an onTermination clause that only cancels the work if the state is .cancelled. For example, consider a sequence that yields 100 values. It might look like:

func integers() -> AsyncStream<Int> {
    AsyncStream { continuation in
        let task = Task {
            for n in 0 ..< 100 {
                continuation.yield(n)
                try await Task.sleep(for: .seconds(1))
            }
            continuation.finish()
        }
        
        continuation.onTermination = { state in
            if case .cancelled = state { task.cancel() }
        }
    }
}

This will finish the stream if it finishes yielding values, and will only cancel the asynchronous work if the sequence, itself, was cancelled. But I personally always check the state so that I donā€™t bother cancelling the task if it may have finished on its own, in the course of its natural lifecycle. (And, yes, cancelling a task that has already finished is a NOOP, so checking the state is technically not necessary, but I think this pattern makes the intent more clear.)

I also might be inclined to avoid the name makeStream to avoid confusion with the AsyncStream static method of the same name. In this case, I might call the asynchronous sequence of integer values integers, or something like that.

Now, in your example of a never-ending sequence, this is a distinction without difference, as you might not need to ever finish, nor the check of the state of cancelled in the onTermination closure (as cancellation is the only way it would ever end), but I offer this rendition for the sake of clarity.


@nonameplum asked:

I will let @FranzBusch answer that question, but he is right, that in general, we are well advised to remain within structured concurrency when we can (so we enjoy automatic cancellation propagation). But in this example, unstructured concurrency is fine (as long as you implement cancellation logic, like above). The typical unstructured concurrency anti-pattern is where one uses Task {ā€¦} (or Task.detached {ā€¦}) without implementing cancellation logic. (And you will see that sort of sloppy implementation when perusing Task {ā€¦} examples online.)

But as long as you do handle the cancellation flow, unstructured concurrency is fine. This is why we have unstructured concurrency, to afford us this sort of fine-grained control. Just remember that when using unstructured concurrency, the burden of handling cancellation rests on your shoulders. But, IMHO, in this example, the unstructured concurrency with explicit cancellation logic is fine. The typical ā€œprefer structured concurrency where you canā€ advice does not apply in this case.


If you really wanted to use makeStream(of:bufferingPolicy:) to wrap this asynchronous task, it might look like:

func integers() -> AsyncStream<Int> {
    let (stream, continuation) = AsyncStream<Int>.makeStream()

    let task = Task {
        for n in 0 ..< 100 {
            continuation.yield(n)
            try await Task.sleep(for: .seconds(1))
        }
        continuation.finish()
    }
    
    continuation.onTermination = { state in
        if case .cancelled = state { task.cancel() }
    }
    
    return stream
}

The idea would be that you would still use an onTermination clause to manually stop the asynchronous work when the sequence is cancelled. In this particular case, I do not think this offers much benefit over the AsyncStream(_:bufferingPolicy:_:) pattern, but I include this for the sake of completeness. Personally, I use this makeStream pattern when integrating with legacy patterns (completion handlers, delegate methods, etc.), where it is useful to save the AsyncStream.Continuation in its own property. But, technically, one can use it here, too.


I guess if we are contemplating other patterns, for the sake of completeness, another alternative is to implement your own AsyncSequence, from scratch. But thatā€™s more complicated and doesnā€™t offer too much value here, IMHO. There are also other variations where you might use swift-async-algorithms, which has first-class cancellation support, but I think that is also beyond the scope of the question.

somewhat tangential, but since this has been resurrected, i think a clarification about the semantics of AsyncStream cancellation may be useful.

it may be somewhat surprising, but this is not precisely the behavior that the onTermination closure provides. the documentation for the property states that it is (emphasis mine):

A callback to invoke when canceling iteration of an asynchronous stream.

that is, onTermination will be called only when there is some client code awaiting the next element of the stream when the Task cancellation occurs[1]. for instance, if it's in the midst of handling an element when this happens, cancellation will not propagate.

here's a contrived example using the original makeStream() implementation that illustrates this fact:

func makeStream() -> AsyncStream<Int> {
    return AsyncStream { continuation in
        let task = Task {
            var n = 0
            while !Task.isCancelled {
                print("Sending", n)
                continuation.yield(n)
                try? await Task.sleep(nanoseconds: 1_000_000_000)
                n += 1
            }
            print("Cancelled!")
        }

        continuation.onTermination = { _ in
            task.cancel()
        }
    }
}

func test_task_cancel() async -> AsyncStream<Int>? {
  let stream = makeStream()

  await withTaskGroup(of: Void.self) { group in
    group.addTask {
      for await n in stream {
        print("got element: \(n)")
        do {
          print("sleeping in for-await body")
          // sleep long enough that we can cancel the child
          // while we're not actively awaiting the stream
          try await Task.sleep(for: .seconds(3))
          print("awoke in for-await body")
        } catch {
          print("child task canceled")
          break
        }
      }
    }

    do {
      try await Task.sleep(for: .seconds(2))
      print("cancelling group")
      group.cancelAll()
    } catch {
      print("group canceled")
    }
  }

  return stream
}

Task {
  let stream = await test_task_cancel()

  if let stream {
    print("consuming remaining elts:")
    for await n in stream {
      print("got: \(n)")
    }
    print("consumed all!")
  }
  print("exiting root Task")
}

depending on the particular timing of cancellation, the underlying stream may or may not be terminated. perhaps this is fine in some cases (if the stream is torn down as part of this process, it will also run the termination handler), but it could lead to surprising behavior if one's mental model is that onTermination is always invoked when a Task using the stream is cancelled.


  1. technically it is also invoked when the stream's internal storage is deinitialized. ā†©ļøŽ

2 Likes

While cancellation is one part of why you should avoid unstructured concurrency there is more to this. Structured concurrency allows you to propagate context down the task tree from the call site including cancellation but not only. It also includes task priority and task locals. While you handle task cancellation and cancel the unstructured task you don't handle priority propagation. In fact, you can't. There are currently no-hooks that allows you to listen to task priority changes in the current task nor to change the priority of a task manually.

In general, if you implement an AsyncSequence you can uphold all of those guarantees (cancellation, priority, locals); however, AsyncSequences are often used to bridge from one task to another or from a non-Swift Concurrency context to one. In those scenarios, cancellation is definitely the most important thing but almost all of them are doable without an unstructured task. Now this might require a different AsyncSequence than AsyncStream but it is possible.

This is an interesting concept and I agree having such a sequence builder would be great but this is currently not possible to implement without an unstructured task somewhere since the AsyncSequence protocol lacks a way to let work run concurrently and most importantly across a call to next. It would be great if we could find a solution to this problem since we have come across this in many algorithm implementations such as merge or zip.

3 Likes

I apologize for splitting hairs, but I believe that you have drawn two incorrect conclusions from your example:

  1. You emphasized that the onTermination closure is called when ā€œcanceling iteration of an asynchronous stream.ā€ That is true, but note that it is also called when the ā€œstream finished as a result of calling the continuationā€™s finish methodā€. See the Continuation.Termination documentation. This is easily manifested if you have an AsyncStream that actually calls finish (and is allowed to finish before cancellation).

  2. You suggest that ā€œonTermination will be called only when there is some client code awaiting the next element of the streamā€. This is not quite correct. The onTermination closure is called as soon as the stream falls out of scope. You do not need to be awaiting a value for the termination to be recognized. The typical pattern (shown below) is to have the task have a local AsyncStream, in which case, as soon as the task is cancelled, the sequence will effectively be terminated immediately (whether the next element is being awaited or not).

Consider the following, where I yield values every two seconds, await a full minute before awaiting the next value, but cancel the whole thing after five seconds:

/// Yield values 0 ..< 20 with two second delay between each.
///
/// When the stream is terminated, cancel the task yielding the values.

func integers() -> AsyncStream<Int> {
    AsyncStream { continuation in
        let task = Task {
            for n in 0 ..< 20 {
                print("Yielding", n)
                continuation.yield(n)
                try await Task.sleep(for: .seconds(2))
            }
            continuation.finish()
        }
        
        continuation.onTermination = { state in
            print("onTermination; state = \(state)")
            task.cancel()
        }
    }
}

func testTaskCancel() async {
    let streamTask = Task {
        let stream = integers()

        do {
            for await n in stream {
                print("Got element: \(n)")
                try await Task.sleep(for: .seconds(60))
            }
        } catch {
            print("Caught error", error)
        }
    }
    
    try? await Task.sleep(for: .seconds(5))
    print("Cancelled after 5 seconds")
    streamTask.cancel()
}

You get the result that you would expect, namely three values are yielded, only the first of which is consumed, but the whole thing is canceled after 5 seconds:

Yielding 0
Got element: 0
Yielding 1
Yielding 2
Cancelled after 5 seconds
Caught error CancellationError()
onTermination; state = cancelled

The issue with your example is that you are cancelling your group task that is iterating through the stream, but you are not letting the AsyncStream fall out of scope (because you return it), and as a result, the stream will keep running until it eventually falls out of scope.

3 Likes

I read your original post (and I think @nonameplum may have, as well) to suggest that you could easily create a cancellable AsyncStream sequence with makeStream(of:bufferingPolicy:) without introducing unstructured concurrency. I was hoping you could post an example.

For me, when I need to create a cancellable asynchronous AsyncStream, that is when I reach for unstructured concurrency (your caveats notwithstanding).


Obviously, making your own AsyncSequence is one way around it. Or you can use the one provided by async from swift-async-algorithms:

func makeStream() -> any AsyncSequence<Int, Never> {
    let numbers = (0...).async
    let ticks = AsyncTimerSequence(interval: .seconds(1), clock: .continuous)

    return zip(numbers, ticks)
        .map { $0.0 }
}

But I inferred from your earlier answer that you were suggesting you could do this easily with AsyncStream.makeStream(of:bufferingPolicy:), and was wondering if you could share your suggested implementation.

1 Like

Okay, I see this somehow got to the 16th post with no one giving an actual example of how this would be done in true structured concurrency. So here goes:

func withStream<T>(_ body: @Sendable (AsyncStream<Int>) async throws -> T) async rethrows -> T {
    let (stream, continuation) = AsyncStream.makeStream(of: Int.self)

    async let _: () = {
        var n = 0
        while !Task.isCancelled {
            print("Sending", n)
            continuation.yield(n)
            try? await Task.sleep(nanoseconds: 1_000_000_000)
            n += 1
        }
        print("Cancelled!")
     }()

    return try await body(stream)
}

Usage is:

async let outerTask = withStream() { stream in
    for await n in stream {
        print("Received", n)
    }
}

try? await Task.sleep(nanoseconds: 3_500_000_000)
outerTask.cancel()
try? await Task.sleep(nanoseconds: 5_000_000_000)

The use of the with* pattern is less than pretty, but is here to stay until ~Copyable evolves to the point of replacing it. (At a bare minimum ~SyncConsumable is needed in order to be able to properly clean up task)

An alternative is to take a task group as an input:

func makeStream(_ group: TaskGroup<Void>) -> AsyncStream<Int> {
    return AsyncStream { continuation in
        group.addTask {
            var n = 0
            while !Task.isCancelled {
                print("Sending", n)
                continuation.yield(n)
                try? await Task.sleep(nanoseconds: 1_000_000_000)
                n += 1
            }
            print("Cancelled!")
        }
    }
}

Then the usage becomes:

async let outerTask = withTaskGroup(of: Int.Self) { group in
    defer { group.cancelAll() }
    for await n in makeStream(group) {
        print("Received", n)
    }
}

try? await Task.sleep(nanoseconds: 3_500_000_000)
outerTask.cancel()
try? await Task.sleep(nanoseconds: 5_000_000_000)

The general rules are:

  1. Any use of Task { violates structured concurrency. If you want to spawn a child task use either async let or GroupTask.addTask. Task { is basically banned in true structured concurrency.
  2. If you want to return a value that depends on a task running in the background, you need to either pass in a TaskGroup to manage the lifetime of that task, or use a with* structure to ensure the lifetime of the task is limited to a closure.

The key part to structured concurrency isn't the delivery of task cancel. It's the strict definition of task lifetimes. It prevents tasks from being leaked, by requiring tasks adhere to bounded scopes, from which they cannot escape.

The "cheating the structure" solution using continuation.onTermination actually still allows the task to leak: If you call makeStream() but don't perform for await on it, and don't cancel outerTask, the stream's background task will leak (and retain a strong reference to the AsyncStream, via continuation, which should be collected at this point). Structured concurrency makes sure the stream's background task cannot outlive outerTask, even if for await is not used.

Structured concurrency is basically RAII for tasks. Which is why it should one day benefit from true RAII via ~Copyable.

3 Likes

This documentation on Task.detached and this thread, is leaving me very confused.

Discussion
Donā€™t use a detached task if itā€™s possible to model the operation using structured concurrency features like child tasks. Child tasks inherit the parent taskā€™s priority and task-local storage, and canceling a parent task automatically cancels all of its child tasks. You need to handle these considerations manually with a detached task.
You need to keep a reference to the detached task if you want to cancel it by calling the Task.cancel() method. Discarding your reference to a detached task doesnā€™t implicitly cancel that task, it only makes it impossible for you to explicitly cancel the task.

What is this? Is this a proposal or something I should know about structured concurrency? How can I get a structured task that can be cancelled like that?

No, there are just a bunch of syntax errors that are clouding the issue:

  • The () is in the wrong place.
  • The variable n was never declared.
  • The closure parameter stream was probably intended to be AsyncStream<Int> in this case.
  • As an aside, the async let does not need to be explicitly canceled and awaited; that happens automatically when it falls out of scope. See SE-0317ā€™s Implicit async let awaiting.

So, perhaps something like:

func withStream<T: Sendable>(_ body: @Sendable (AsyncStream<Int>) async throws -> T) async rethrows -> T {
    let (stream, continuation) = AsyncStream.makeStream(of: Int.self)

    async let _: () = {
        var n = 0
        while !Task.isCancelled {
            print("Sending", n)
            continuation.yield(n)
            try? await Task.sleep(for: .seconds(1))
            n += 1
        }
        print("Cancelled!")
    }()

    return try await body(stream)
}

Or, if it was a stream that could finish, youā€™d might do something like:

func withStream<T: Sendable>(_ body: @Sendable (AsyncStream<Int>) async throws -> T) async rethrows -> T {
    let (stream, continuation) = AsyncStream.makeStream(of: Int.self)

    async let _: () = {
        for n in 0 ..< 20 {
            print("Sending", n)
            continuation.yield(n)
            try await Task.sleep(for: .seconds(1))
        }
        continuation.finish()
    }()

    return try await body(stream)
}

Itā€™s not a pattern I personally would adopt, but I think this (or something like it) was the intent.

3 Likes