TaskGroup: Keeping a Reference to Child Tasks?

Context:

Suppose I want to download 300 images in the background using Swift Concurrency. I'd like three things:

  1. As much parallelism as possible.
  2. My caller to receive each image as it's downloaded, rather than waiting for all of them to finish.
  3. If another caller asks for an image-download that's already queued up, that caller should not start a new, second download.

Approach:

TaskGroup + AsyncStream seem like a good fit for #1 and #2. I could do something like this:

func images(for urls: [URL]) -> AsyncStream<(URL, NSImage)> 
{
    AsyncStream { continuation in
        let task = Task {
            await withTaskGroup(of: (URL, NSImage).self) { group in
                for url in urls {
                    group.addTask { await (url, self.downloadImage(url: url)) }
                }

                for await tuple in group {
                    continuation.yield(tuple)
                }

                continuation.finish()
            }
        }

        continuation.onTermination = { _ in
            task.cancel()
        }
    }
}

My caller can for await in the sequence and process images as they arrive.

But the trouble is point #3. I can't keep a reference to the child tasks that are added to the group, so if a new caller asks for a URL that's already in-flight, I can't have that caller await the child task within the TaskGroup.

To solve #3, I see two possibilities:

  1. Use separate task.detached Tasks and keep a reference to each one in a [URL: Task<...>] dictionary so that I can have new callers await the already-running Task. The downside here is that this is much less efficient than a TaskGroup.

  2. Keep a Set<URL> of the URLs that are "in" the TaskGroup and, if a new caller arrives, have it await the final result of the entire TaskGroup, then look up the NSImage assigned to the requested URL. The downside here is that the new caller must wait for all 300 images to finish downloading before it gets the one it wants.

Question:

Is there a better approach? Ideally, I'd like to have the new caller simply await the result of the specific child Task that is associated with a given URL. But I don't see how to do that with TaskGroup.

1 Like

There's currently no way to get a reference to a child task, which is effectively what you'd want here.

We don't have it because offering a reference to a structured child task is not safe unless we can prove you won't be able to escape it. We don't have non-escapeable values yet (though I think this is being worked on or was going to be at some point?). If we had those, then we can form a "non-escapeable ChildTask" what you could inspect or cancel etc.

Worth creating an issue asking for this on github.
I fear that today you'll be forced into unstrucutred concurrency for this -- also, please prefer just Task{} rather than the whole detached when possible, as you'd want to inherit task locals etc usually.

5 Likes

@ktoso Thanks! My "download" example is contrived because it's easier to explain. In reality, I have 300 video files on a LAN-based server and this method is going to open them, grab a screenshot at specific frames, and then return those images. There is latency involved, so I want this to happen off the main thread for sure.

The right way to guarantee that is with Task.detached, no? I could use just Task{} if I were going to call something like NSURLSession that steps off the main thread to complete the request, but in this case it seems like detached would be the right call?

1 Like

Or put your logic into another actor, not the main actor, and use Task.

Task{} only inherits enclosing actor execution context if you close over its self, so it's somewhat more delicate than that.

But yeah if you don't care about priority, locals, etc... you can detach, but I feel people reach for it a bit quickly tbh.

@ktoso My understanding of Actors may be off. I thought their primary purpose was to guard against race conditions on shared mutable state by serializing access to that state. My model of them was essentially "a serial queue with fancy lock replacements."

If I create an Actor and then use Task {}, does that not perform the actions one-by-one in sequence? I presumed that an Actor was confined to a single thread from the concurrent thread pool.

1 Like

It (Task) does if it's called in a serial context (e.g. in an actor's context).

In that situation, you have to use Task.detached or TaskGroup to escape the current actor context (if there is one), i.e. to actually get parallelism.

By analogy to GCD, it's essentially like all the methods of spawning new tasks use the current queue (which might be serial, or might be concurrent), except for Task.detached and TaskGroup which implicitly and only target the global concurrent queue.

1 Like

@wadetregaskis I see. So is it correct to say that a Task created in an Actor, but which accesses no stored state in that Actor will be executed in parallel with other Tasks running on that Actor? In other words, if the Task does not capture self, it does not block other Tasks from running simultaneously?

I had a similar topic at some point and this was the response:

My naive thinking is that we don't really need to guard the child task escaping through a reference. The task group guarantees that all child tasks will be done when the group itself is finished, so the reference would only return either a value or an error and the explicit cancelation would be a no-op.

1 Like

Here's a recent thread on this: @_inheritActorContext does not inherit the actor context in certain situations You can confirm the behavior yourself based on those snippets.

As usual with concurrency and guarantees it is not quite as naive and trivial to declare it'll be simple...

Do we guarantee that they're finished or finished and DESTROYED? Because that's what we actually do today.

It's worth an in depth investigation, but it's not as simple as just a missing method we can just throw on there. It might have big implications.

3 Likes

I didnโ€™t meant to put it in a way that the implementation of it would be simple or if there was no implication. I'm obviously no expert on that field, hence naive thinking at the beginning of my sentence.

However even if we would not allow to explicitly obtain a reference to the child task, we could have a token based system instead. Capturing tokens would not change the current behavior and they would be only valid within the groups main scope.

I think the more challenging aspect of explicit child task cancellation would lie with async let where the current async function scope is kinda like a task group but with some other implicit behaviors.

I definitely support the idea of being able to control the cancellation of child task. I implemented an unstructured solution but it would be much more elegant if there was a structured one instead.

1 Like

I have thought about a token based system before as well something like this

await withTaskGroup(of: Void.self) { group in
    let childTaskToken1 = group.addTask {}
    let childTaskToken2 = group.addTask {}

    await group.wait(for: childTaskToken1)
    group.cancel(childTaskToken2)
}

While I love the pragmatic approach to the problem using ~Escapable types would make this even nicer. Until then anyone could implement this with Task {} and a token system in a library.

2 Likes

It doesn't matter what the task captures, it is still bound to the actor.

You can see this easily with e.g.:

Task {
    while true {}
}

while true {
    print("I'm alive!")
    try! await Task.sleep(for: .seconds(1))
}

The exact behaviour of that example might vary depending on Swift compiler version and possibly other factors. For me, with Swift 5.9.2, it prints "I"m alive!" once and then never again. That's because the spawned Task doesn't take over the current actor's (@MainActor) execution immediately when spawned, it's merely put on the queue (so to speak). Only when the currently executing task (the top level code in this example) first yields does the child task get to execute. And since it just spins forever, never relinquishing the actor, nothing else ever runs again.

If you change it to use Task.detached, or withTaskGroup, @MainActor won't be blocked and will print that it's alive every second, forever. The child Task will chew on a separate CPU coreยน.


ยน Technically this isn't guaranteed, as it's valid for there to be only one thread available for the entire program, I believe. But in reality you have the @MainActor thread and at least one thread in the global concurrency pool (usually one per CPU core on your given machine, although the iOS Simulators have sometimes restricted that to one thread for some reason).

1 Like

As long as there's no requirement to return the result of an already-yielded image, I'm pretty sure you could rig up a solution with continuations, which would look roughly like:

  1. Your second caller awaits on a withCheckedContinuation/withUnsafeContinuation call, the body of which registers the URL and its continuation instance into some collection managed by a dedicated actor
  2. Right before/after your TaskGroup/AsyncStream body yields a new image, it checks with the actor to see if there's a continuation registered for that URL, resuming it and returning the image if so
  3. After all the queued images are downloaded, clean up any dangling continuations (throw an error, download the image for maybe the second time, etc.)

This does involve the performance hit of suspensions for checking with the actor on every image download, but should meet the rest of your criteria and preserve the use of TaskGroup instead of having to manually manage the tasks.

2 Likes

@MPLewis Ooo, that's clever. This is a simplified example of what I'm actually building, which is essentially an image cache (where the images are captured frames of video using AVFoundation). I can solve the already-yielded-image problem by simply caching the images on the Actor when the child Task completes and registering the continuation only on cache-misses.