Why `TaskGroup.addTask(priority:operation:)` doesn't return the task?

The current TaskGroup design is fantastic in that it offers very lightweight syntax allowing iterating over the task results in real-time as they complete.

This is a very fine use case and most likely what most of people would want to do (if they need to process the task results at all)

There is, however, also another use case that isn't catered by this design — like in Sundell's lib here: CollectionConcurrencyKit/CollectionConcurrencyKit.swift at main · JohnSundell/CollectionConcurrencyKit · GitHub — the results aren't processed in real-time but preserving the order is important. The code in the repo uses top tasks to preserve the original order but that forfeits the group's hierarchy benefits of automatic cancellation and more.

I'm looking at the source code of TaskGroup.addTask:

let flags = taskCreateFlags(
      priority: priority, isChildTask: true, copyTaskLocals: false,
      inheritContext: false, enqueueJob: true,
      addPendingGroupTaskUnconditionally: true
)

// Create the task in this group.
_ = Builtin.createAsyncTaskInGroup(flags, _group, operation)

It seems to me that the method intentionally doesn't return the added task but I'm not sure what's the motivation for this? It seems trivial to return that task so that developers are able to either 1) iterate over the group to process results on a first come first served basis or 2) hold on to the tasks returned by addTask and decide the order for themselves.

Here's how processing child task results in order could look like:

	await withTaskGroup(of: Int.self, body: { group in
		let tasks = (0..<100).map { i in
			group.addTask { i }
		}
		for task in tasks {
			print(await task.value)
		}
	})

It doesn't because it can't do so safely.

For the same reason you can't just "get the current task". The only way to do this is withUnsafeCurrentTask which says it out loud that you need to be careful with what you do with this task reference or you'll get undefined behavior (reading deallocated memory and friends).

There was talk to be able to mark values to "not be able to escape" then we'd be able to implement this safely. The group's tasks are allocated on the stack, and they must be cleaned up as you exit the group -- storing any task reference beyond its lifetime would be unsafe.

The group parameter of the withTaskGroup actually would benefit from this as well.


Very early iterations of the API (you can see the Swift Evolution reviews) DID return the task actually. But we then discovered that in order to optimize these task allocations, we cannot do so safely and had to remove this. The API could exist if we allocated such tasks less efficiently (not in the task local stack allocator), in theory at least.

2 Likes

Thanks Konrad, I can see how having control over the tasks themselves could be dangerous, I get it.

Back to the original problem of processing concurrent results in-order — do you see a different way to return the results in order without having to sort them which might be performance heavy?

Not sure how common of a problem this is, but it almost feels that it'd be useful to (instead of accessing the tasks) be able to configure the iterator. Something like:

await withTaskGroup(of: Int.self, body: { group in
	(0..<100).forEach { i in
		group.addTask { i }
	}

	let iterator = group.makeAsyncIterator(
		deliveryStrategy: .originalOrder
	)
	while let result in iterator.next() {
		print(result)
	}
})

Groups are not intended to be ordered. It is not just a property of the iterator, but how the entire group works -- it can't not know how it will be asked to gather results after the fact of kicking off the work, it'd be wildly inefficient.

Streams totally can be though, and most often are.

What you're asking for it a "parallel map*" which would be a simpler way to express this -- then there is no "foreach x, add task, then collect them" there is just "map In to Out, with (some) parallelism".

I'd rather invest into offering a parallel map on AsyncSequence and friends rather than make the group fit what streams excel al.

// cc @Philippe_Hausler since his interest lie within offering more APIs on async sequences.

We could revisit this once we have some hard data in hands showing e.g. that a stream solution is inefficient etc, but for now let's not jump the shark here imho.

* or mapAsync(parallelism:) how I called this in a previous life :wink:

3 Likes

Ok, that's fair. My motivation for these questions is that I can build whatever I need but I get to talk to a lot of people with little understanding of async programming and some of these APIs are difficult to understand.

Probably (as a final note) a general paragraph about how groups are intended and best to use could help on the type's doc page -> TaskGroup | Apple Developer Documentation

The map, filter etc are all async closures (and throwing versions too) for AsyncSequence. But, those don't map in parallel. They just await the closures to execute in the iteration.

It often times feels like we should go wide with concurrency - but small executions may not be the most optimal. For example dispatch_apply does not always mean your program magically becomes more performant - often times on the contrary, it can become slower because of the overhead per item execution becomes a burden. In the development of AsyncSequence I used a tidbit that I learned from working on Combine - flatMap being defaulted to a concurrent unbounded width is not only hard to understand but also not often the right answer - hence why the AsyncSequence.flatMap is more akin to a max concurrent operations 1 version of Combine's flatMap.

That all being said there are good reasons to have "going wide" style APIs. I would guess there would be a family of these as the previously mentioned package does. However the one alteration I would say might be interesting/useful is to limit the maximum number of items to saturate at. One of the early prototypes of AsyncFlatMapSequence did just that - so I know it is possible. I would also guess that API family may not ever be the "common" tools of the trade and would likely be well served as living in a package outside of the standard tools folks reach for.

I do kind of like the name concurrentMap, primarily since map on an AsyncSequence is already asynchronous.

That API shape is pretty decent (perhaps modulo the dispute on the name I have...) which has a knob on the "maximum level of parallelism". That knob definitely makes it more complicated but I think that is probably worth it.

The space that really likely needs to be explored are:

  • Can that parallelism be generalized into a tool for all of this family of thing to use?
  • Does it make sense to have a way to express ordered output or unordered output?
  • How can it apply to both AsyncSequence and Sequence?
  • Can we do something specialized with Collection, RandomAccessCollection or even RangeReplaceableCollection?
  • Do a subset of these belong in a lower level context? e.g. living in the concurrency library? or do they belong in a higher level package?
  • If they belong in a higher level package are there things that can be added as primitives to the concurrency library to make that type of thing easier/safer/possible?
2 Likes

You can do it without sorting, but there is no way around having to buffer at least some of the results (that arrive out of order) before starting to produce the results.

Here is a naïve sample that buffers all of the results, but does not sort
import _Concurrency
import PlaygroundSupport

let page = PlaygroundPage.current
page.needsIndefiniteExecution = true

typealias OrderedValue<T> = (value: T , order: Int)

func myTask(_ input: Int) async -> Int {
    print("    begin myTask(\(input))")
    try? await Task.sleep(nanoseconds: .random(in: 5000...50000))
    print("    end myTask(\(input))")
    return input * 20
}

let taskCount = 10

Task.detached { await withTaskGroup(of: OrderedValue<Int>.self) { group in
    
    for order in 0..<taskCount {
        print("  Submit task #\(order)")
        group.addTask { (await myTask(order), order) }
    }
    
    var results = [Int?](repeating: nil, count: taskCount)
    
    for await aResult in group {
        print("  Receive resul of task #\(aResult.order)")
        results[aResult.order] = aResult.value
    }
    
    print( "*** Producing the collected results in order:")
    for i in 0..<taskCount {
        print("  task #\(i) -> \(results[i]!)")
    }
    
    page.finishExecution()
}}

In a more realistic scenario, you should start producing as soon as the first in-order result is received and buffer only the ones that arrive out of order.

1 Like

I probably haven't spent enough time to think of all the constraints of how any of this can fit in the standard library hierarchy (I started this thread to actually inquire about the result of addTask) but I think that, assuming consumers know what they're doing, there is a use case in which you'd like to do work in parallel.

Whether this belongs in stdlib or another library (or possibly not provided by default at all) — I don't know.

I would personally like it if there was a 1st party API that allows me to easily parallelize. In the past I had to put something together whenever I needed that, for example https://github.com/apple/swift-docc/blob/main/Sources/SwiftDocC/Utility/Collection%2BConcurrentPerform.swift#L32-L34 but a stdlib API would be much better.

For future readers, who might wonder how to do this:

A sample that processes results in order and as soon as possible
import _Concurrency
import PlaygroundSupport

let page = PlaygroundPage.current
page.needsIndefiniteExecution = true

typealias OrderedValue<T> = (value: T , order: Int)

func myTask(_ input: Int) async -> Int {
    print("    - begin myTask(\(input))")
    try? await Task.sleep(nanoseconds: .random(in: 10000...100000))
    print("    - end myTask(\(input))")
    return input * 20
}

func processTaskResult(value: Int, order: Int) async {
    try? await Task.sleep(nanoseconds: .random(in: 10000...100000))
    print("Processed task result: \(value) (original order: \(order))")
}

let taskCount = 10

Task.detached { await withTaskGroup(of: OrderedValue<Int>.self) { group in
    
    for order in 0..<taskCount {
        print("  - submit task #\(order)")
        group.addTask { (await myTask(order), order) }
    }
    
    var resultBuffer = [Int:Int]()
    var next = 0
            
    print( "*** Producing the collected results in order:")
    for await aResult in group {
        print("  - receive result of task #\(aResult.order)")
        resultBuffer[aResult.order] = aResult.value
        while next < taskCount, let nextResult = resultBuffer[next] {
            await processTaskResult(value: nextResult, order: next)
            resultBuffer[next] = nil
            next += 1
        }
    }

    precondition(resultBuffer.isEmpty, "Program bug! Some results were never consumed.")
    
    page.finishExecution()
}}