Parallel computation DAG / Shared Futures?

I have a computation I'd like to parallelize and I'm not sure what Swift facilities support the use case. You might summarize it like this:

func compute(_ input: [Key]) -> [Key: Value]

The result will contain all the keys that were in the input, plus some additional ones that were discovered as part of the computation. There's a discovered dependency DAG among all the values.

To give an overly-trivial example of the shape of the computation, we could say that keys are integers and the result is a dictionary mapping n into n!:

func  compute(_ input: [Int]) -> [Int: Int] {
  var r: [Int: Int] = [:]

  func fact(_ x: Int) -> Int {
    if let y = r[x] { return y; }
    let y = x == 0 ? 1 : x * fact(x - 1)
    r[x] = y
    return y
  }

  for z in input { _ = fact(z) }
  return r
}

We can imagine the final for loop as launching tasks in parallel for each element of input.

Because of the DAG, the computation of one key's value might conceivably suspend awaiting the computation of another key's value on which it depends, if that computation has already started in another thread. This sounds a bit like a system of shared futures, but it doesn't look like Combine's Future is sendable…

I also may want to batch results computed in each thread and only incorporate them into the final result when a batch is large enough to make the synchronization cost worthwhile. At that point I'd also like everything in the final result to become available to the computation in a given thread. It's not the end of the world if some (key, value) pairs get computed twice; that might get me out of the need to have shared futures.

Can anyone suggest the appropriate facilities for solving a problem like this?

7 Likes

I don't have a solution for this, but it's one of those topics that for some reason always get stuck on the back of my mind. I think it relates to some of the explorations I did a while ago (Experimenting with a Query Resolver System). I'm sure there is nothing you didn't already think about, but I thought sharing wouldn't harm. I will keep an eye on this post to see what others suggest, I'm very curious.

Structured concurrency as is stands in general isn't great for DAG things as you may be adding to parent (that we can do), and waiting from those child tasks on some of those added results (this we do not want to do, it'd force us into MPMC (multi producer multi consumer), rather than MPSC (single consumer) which is what the task groups are designed for).

Without spending more time of thinking on a design here my gut feel is that you may have to use unstructured tasks and just form that DAG and "dependency chain" via normal Task{} and continuations. I've not spent much time thinking about this though sorry, perhaps later I can make more time here.

You may end up modeling some "controller" actor containing a state machine and use it to orchestrate those tasks, but in the end you'll have to figure out the structure on your own anyway

6 Likes

Elaborating on @ktoso's suggestion, I think you can use Task itself as a "Future" and use an actor to access the shared state

extension Dictionary {
  func mapValuesAsync<R>(_ transform: @Sendable (Value) async -> R) async -> [Key: R] {
    var result = Dictionary<Key, R>(minimumCapacity: capacity)
    for (key, value) in self {
      result[key] = await transform(value)
    }
    return result
  }

  mutating func getOrCreate(_ key: Key, factory: () -> Value) -> Value {
    if let result = self[key] {
      return result
    }
    let result = factory()
    self[key] = result
    return result
  }
}

actor Comp {
  private var r: [Int: Task<Int, Never>] = [:]

  var result: [Int: Int] {
    get async {
      await r.mapValuesAsync { task in
        await task.value
      }
    }
  }

  init(_ input: [Int]) async {
    let initialTasks = input.map(fact)
    for task in initialTasks {
      // wait for the requested inputs
      _ = await task.value
    }
    // at this point all additional computations are done
  }

  private func fact(_ x: Int) -> Task<Int, Never> {
    return r.getOrCreate(x) {
      Task.detached {
        print("Starting task for \(x) on thread \(pthread_self())")
        let result = x == 0 ? 1 : x * (await self.fact(x - 1).value)
        print("Finishing task for \(x) = \(result)")
        return result
      }
    }
  }
}

await Comp([6, 4, 5]).result
4 Likes

@dmt (and @ktoso) That's not bad, thank you! I didn't realize that Task was essentially a future, but I see that it is. I was surprised to see that this didn't seem to be creating a huge number of thread and tended to execute tasks on the same thread where possible. I wonder what the actual synchronization cost is but this looks like a good start.

When I previously looked at this problem I seized on Operation because it seems to have some ability to handle dependencies, and presumably that could allow scheduling with less synchronization? Any insight there? I could of course try to measure the result. I think because operations are discovered I would still need an actor to coordinate and prevent operations from being duplicated.

Update:
I tried benchmarking @dmt's Task method vs using Operation and it looks a lot like Tasks win. They're certainly a lot simpler to use! And it's worth noting that it looks ike Operation doesn't really allow for dependency discovery in the same way; the dependencies really have to be discovered in a phase prior to starting a computation.

1 Like

(Aside) this is a more efficient getOrCreate (thanks to @jrose); I didn't write it that way in my example for perhaps-obvious reasons. It really ought to be in the standard library… for the same reasons:

  mutating func getOrCreate(_ key: Key, factory: () -> Value) -> Value {
    return { (x: inout Value)->Value in x }(
      &self[key, default: factory()]
    )
  }
2 Likes

IMHO, this could be an example, but isn't it mostly a shared state problem here? :thinking: I've tried to update example with Mutex, but easiest way was actually just to wrap cache into actor and use it with TaskGroup, like:

actor Cache {
    var r: [Int: Int] = [:]

    func update(key: Int, with value: Int) {
        self.r[key] = value
    }
}

func compute(_ input: [Int]) async -> [Int: Int] {
    let cache = Cache()

    @discardableResult
    func fact(_ x: Int, cache: Cache) async -> Int {
        if let y = await cache.r[x] { return y }
        let y = await x == 0 ? 1 : x * fact(x - 1, cache: cache)
        await cache.update(key: x, with: y)
        return y
    }
    
    return await withTaskGroup(of: Void.self, returning: [Int: Int].self) { group in
        for z in input {
            group.addTask {
                await fact(z, cache: cache)
            }
        }
        await group.waitForAll()
        return await cache.r
    }
}

Though of course not sure how often this cache will be called as execution is parallel, but maybe there will be advantages in some cases...

1 Like

Yes, there will be shared state here as long as we discover dependencies during computation. We could avoid it by building the entire graph ahead of time, but that complicates the code a lot.

I added your method to my benchmarks. It seems to win on every metric other than throughput, which seems very… unaccountably… low. I don't know what to make of that. When you read the throughput numbers, pay attention to the units specified in the left-hand column.

1 Like

Maybe it has something to do with the fact you're letting the library decide the configuration for each of your benchmarks individually?

EDIT: It's just the way the table is printed:

ParallelDAG:Jaleel (K) << K probably stands for Kilo- units
ParallelDAG:Operations (#) << # probably stands for ones
1 Like

Ah, got it. Yeah, would be interesting thing to tackle. :thinking:

Oof, thx, was playing with benchmark for half an hour trying to understand...
Think I have even more quicker and optimise version then—just updating actor to new Mutex, but requires Swift 6 and macOS 15 (you can try to use NSLock though for back port, but afaik performance would be same as actor then):

import Synchronization

final class MutexCache: Sendable {
    private let _r: Mutex<[Int: Int]> = Mutex([:])
    var r: [Int: Int] {
        get { self._r.withLock { $0 } }
        set { self._r.withLock { $0 = newValue } }
    }

    func update(key: Int, with value: Int) {
        self.r[key] = value
    }
}

func mutexCompute(_ input: [Int]) async -> [Int: Int] {
    let mutexCache = MutexCache()
    
    @discardableResult
    func fib(_ x: Int, cache: MutexCache) -> Int {
        if let y = cache.r[x] { return y }
        let y = x < 2 ? 1 : fib(x - 1, cache: cache) + fib(x - 2, cache: cache)
        cache.r[x] = y
        return y
    }
    
    await withTaskGroup(of: Void.self) { group in
        for z in input {
            group.addTask {
                fib(z, cache: mutexCache)
            }
        }
        await group.waitForAll()
    }
    return mutexCache.r
}

Ojeez, good catch! That's a bit annoying.

That's essentially what the Operation example does. I bet a system with explicit depenedencies could be a lot more efficient than that; I get a sense Operation is “very legacy.”

Why do you think using a Mutex is faster than using an actor?
Update In fact, I just used a pthread mutex (implementation shamelessly lifted from SwiftNIO) and it was indeed faster, even with these extremely high-granularity tasks. I'm still interested in how you knew it would be faster.

Can't this be tested with a Swift 6 development snapshot toolchain? Answer: not without MacOS 15. Surprising that Mutex requires something new in the OS but there ya go.

1 Like

It might be worth benchmarking it against OSAllocatedUnfairLock if your target os is high enough. And if it's not - it's not that complicated to reimplement it with the older C APIs and ManagedBuffer (example).
I think suspending/resuming tasks might have a noticeable impact when the actual computation is as simple as adding two integers. Although I guess your real-world application will be much more computationally expensive so maybe in the end the synchronization cost will be negligible using any of the primitives.

Mutex contains an os_unfair_lock on Darwin (but without the separate allocation).

It mostly will be; the point of this exercise is to measure the synchronization overhead.

You need import os.
Synchronization provides you with Mutex.
os provides OSAllocatedUnfairLock.

Beware that the os module is private to Apple OSes and AFAIK isn't shipped to any other platform.

EDIT: Sorry, didn't see your edit before posting

@dmt Thanks I figured it out and published another benchmark. It is indeed faster. Is the example you pointed at any more portable? If not, perhaps it's an unfair lock on Apple platforms and a pthread mutex everywhere else.

It's not. It can be used on older versions of macos, but it still relies on Darwin's os_unfair_lock or OSSpinLock.
I see the Mutex implementation for Linux uses futex probably to avoid notifying the kernel when possible.
So I guess to make that solution cross-platform you'd need to reimplement these ll_ prefixed functions and make sure that the size of LowLevelLock matches the size of your underlying primitive.

AFAIK this will end up making a copy of the entire dictionary on each update, while holding the lock. A modify coroutine would help here. Otherwise the actor approach will scale better with cache size.

You could also use a read/write lock. That will definitely scale better than an actor.

2 Likes