Multi-thread processing operator for stdlib?

Hi S/E,

I found myself writing the following code for a macOS project recently:

extension DispatchGroup {

    public static func parallelize(threads: Int,
                            name: String = "Parallelize",
                            priority: DispatchQoS = .default,
                            workProvider: () -> (() -> Void)?) {
        let concurrentQueue = DispatchQueue(label: name, qos: priority, attributes: .concurrent)
        let semaphore = DispatchSemaphore(value: threads)
        let threadGroup = DispatchGroup()

        while let nextWork = workProvider() {
            semaphore.wait()
            threadGroup.enter()

            concurrentQueue.async {
                nextWork()
                threadGroup.leave()
                semaphore.signal()
            }
        }

        threadGroup.wait()
    }
}

It can be used as in the example below (the workProvider is a source of closures to process across the number threads specified):

        let fileEnumerator = filemgr.enumerator(atPath: projectRoot)

        DispatchGroup.parallelize(threads: 4) {
            guard let file = fileEnumerator?.nextObject() else {
                return nil
            }
            return {
                if let path = file as? String,
                    (try? LanguageIdentifier(for: URL(fileURLWithPath: path))) != nil {
                    self.processFile(path: self.projectRoot + "/" + path)
                }
            }
        }

Would/should there be space for this abstraction in the standard library?

edit: made it an extension on DispatchGroup

1 Like

In general I'm in favour of improving the accessibility of parallelisation. None of the following is meant to take away from or discourage that - I'm just adding ideas to the pool.

An alternative approach would be forEach-style, e.g.

DispatchGroup.forEach(in: fileEnumerator, maxConcurrency: 4) {
    if let path = $0 as? String, (try? LanguageIdentifier(for: URL(fileURLWithPath: path))) != nil {
        self.processFile(path: self.projectRoot + "/" + path)
    }
}

Seems like it saves a bit of work, provided you can provide an enumerator.

A variant could be provided which takes a 'work provider' closure as in your original example, instead of an enumerator. Ideally at some point in the future Swift would make them equivalent, presumably once Swift gets religion on generators et al. :slight_smile:

It might be useful to have a way to dynamically adjust the concurrency. A place I frequently use concurrency of this nature is when reading large numbers of files. An optimal implementation of that - in terms of minimal CPU & memory usage, minimal latency, and maximum throughput - requires dynamically adjusting the level of parallelism (to automatically optimise for the hardware configuration in hand and adapt to interference from any other activity going on within the computer).

An initial implementation certainly isn't required to support this, but it would be wise to anticipate it and have some idea of how it might be evolved into the API in future.

1 Like

Grand Central Dispatch is not part of the stdlib. I believe the current policy is to punt on all such proposals until the language gains some kind of asynchronous programming primitives. While they are likely to be async/await, this has not been decided by SE, never mind implemented in any language version.

You may want to take a look at DispatchQueue.concurrentPerform
https://developer.apple.com/documentation/dispatch/dispatchqueue/2016088-concurrentperform

You can’t specify the number of threads but it uses all available cores if you give it enough work. The number of iterations should be higher than the available cores. This allows the system to use the available resources better because most tasks will not take the exact same duration.

4 Likes

One thing I notice about the implementation: it looks like you're peeling off work items one at a time and synchronizing on each item (semaphore wait/signal).

Isn't that fairly non-optimal with respect to performance? I thought it was generally better to parallelize bigger chunks of work, since synchronization in itself is fairly expensive.

So, for instance, if you were operating on a list of 1000 items, it would in many cases it would be better to break off chunks of 10 or 100 items to handle in parallel, rather than taking them one at a time. I.e. you would pay the synchronization cost 10 or 100 times rather than 1000 times.

It doesn't seem like there's any reason you couldn't pass chunked work from your workProvider closure, but it seems like if we're talking about adding abstractions for concurrency, it would better to roll chunking into the abstraction itself, since this version would probably encourage people to do things in a fairly sub-optimal way.

Batching exacerbates the straggler problem, where you end up losing effective parallelism while waiting for the last few work items to complete; items in a batch are held up, effectively serialised, even when there’s other threads (cores) available to execute them.

It is of course a trade-off, between performance lost waiting for stragglers vs performance lost in thread & work management overhead. There’s no hard nor fast rule on where to make that trade-off - it depends on a lot of factors, such as the nature of the work involved, the machine it’s being run on, etc.

Which is not to say, of course, that it wouldn’t be pretty trivial to add a batchSize parameter to any such function in order to expose that as a tunable.

That’s logical, but also frustrating, since (AFAIK) native concurrency support, while often discussed, still has no roadmap nor schedule in Swift?

At one point does this become a perfect-the-enemy-of-the-good problem?

1 Like

Thanks for all the input. Y’all are quite right this would be an addition to swift-corelibs-libdispatch not stdlib. I’ve reworked my code to follow @wadetregaskis’s signature subsuming the processing of the sequence.

extension DispatchGroup {

    static func forEach<Seq: Sequence>(in sequence: Seq,
                 maxConcurrency: Int,
                 queueName: String = "concurrentEach",
                 priority: DispatchQoS = .default,
                 worker: @escaping (Seq.Element) -> Void) {

I’d be more enthusiastic about concurrentPerform if it were a method on a DispatchQueue instance rather than a class method with an awkward implicit argument which is the current queue. There is a good discussion about it here. This means it is no use when it is called from the main thread and you would need to do something like this (this time a version of map in an extension on Sequence…):

extension Sequence {

    @discardableResult
    func concurrentMap<Output>(maxConcurrency: Int,
                       queueName: String = "concurrentMap2",
                    priority: DispatchQoS = .default,
                    worker: @escaping (Element, @escaping (Output) -> Void) -> Void) -> [Output] {
        let input = Array(self)
        var output = Array<Output>(unsafeUninitializedCapacity: input.count,
                                   initializingWith: {
                                    (buffer, initializedCount) in
                                    let stride = MemoryLayout<Output>.stride
                                    memset(buffer.baseAddress, 0,
                                           stride * input.count)
                                    initializedCount = input.count})

        output.withUnsafeMutableBufferPointer {
            buffer in
            let buff = buffer
            let concurrentQueue = DispatchQueue(label: queueName, qos: priority,
                                                attributes: .concurrent)
            let semaphore = DispatchSemaphore(value: maxConcurrency)
            let threadGroup = DispatchGroup()

            for index in 0..<input.count {
                threadGroup.enter()
                semaphore.wait()

                concurrentQueue.async {
                    worker(input[index], { result in
                        buff[index] = result
                        threadGroup.leave()
                        semaphore.signal()
                    })
                }
            }

            threadGroup.wait()
        }

        return output
    }

I agree trying to include chunking of work probably creates more problems than it solves. I had in mind reasonably large pieces of work so the synchronisation overhead wouldn't be significant.

It’s not a matter of perfect vs good... GCD isn’t part of the project, so IIUC extending it isn’t really a thing we can do as an official part of Swift.

It isn’t indeed, but it is IMO the de facto if not de jure concurrency library for Swift today. It is included with every Swift install on macOS & Linux (since Swift 3, at least).

Nonetheless, you’re right about the appropriate forum not technically being this one - Grand Central Dispatch points to a mailing list & Github project, where at a glance at the pull requests it does seem to be a genuinely open project these days, not merely the lip service Apple used to pay to open source.

Still, IMO it’s good to discuss concurrency improvements in Swift in these forums, even if the implementation aspect might take place outside the core Swift libraries, since concurrency is so fundamental & critical, and might be shaped better (read: more Swiftonic) in these forums.

1 Like

Thanks for reminding me of that API - it is indeed useful in some cases.

However, in the wide family of situations the original poster was considering - i.e. where you’re enumerating over an indeterminate number of work items - it’s not a good fit. It requires you to know in advance how many items to iterate on, and even beyond that it’s a bit kludgy in the small details - e.g. it just gives you an arbitrary integer index, which you have to, inside your work function, somehow map to the actual thing you care about, e.g. a file.

And as @johnno1962 noted, it has some ergonomic issues, like running implicitly against the current queue, which is almost never what you want from that sort of API.

1 Like