Best practices for operating on a buffer in parallel?

I have an "embarrassingly parallel" problem: computing a function on every pixel. Some values are already computed; there is a special value to indicate notComputed. In the old C++ using pthreads code which I'm porting, the task was dividing into parts and with each thread accessing different scanlines, there was no locking or synchronization on the shared buffer. The buffer is already allocated with some values already computed. When the user resizes or scrolls a view, old values are preserved as new regions are revealed and need to be calculated.

Swift Structured Concurrency is so much easier to work with than the pthreads API, but the way. Many thanks to everyone involved.

I am brand new to using Swift Concurrency. My first attempt at parallelizing in Swift looks like this

    @Sendable func compute(rows: ClosedRange<Int>, data: UnsafeMutableBufferPointer<Pixel>) async {
        for y in rows {
          for x in 0 ..< size.x where data[x + y * size.x] == notComputed {
              let color = scaledColor(colorExpr.evaluate(on: grid, x, y))
              if Task.isCancelled { return }
              data[x + y * size.x] = color
          }
    }

    let pixels = UnsafeMutableBufferPointer<Pixel>(start: &colorMap[0], count: colorMap.count)
    await withTaskGroup(of: Void.self) { taskGroup in
        for k in 0 ..< numProcessors {
            let firstLine = firstRow + k * numScanLinesPerTask
            let lastLine  = min(firstLine + numScanLinesPerTask - 1, lastRow)
            if lastLine >= firstLine { taskGroup.addTask { await compute(rows: firstLine ... lastLine, pixels: pixels) }}
        }
    }

That duplicates the behavior in the original code, and works as intended as far as I can tell.

The compiler doesn't allow use of colorMap directly, as shared mutable state. Is subverting that with UnsafeMutableBufferPointer knowing that each task accesses disjoint rows merely poor style or is it incorrect in ways I don't understand yet? The compiler warns that Initialization of ‘UnsafeMutableBufferPointer<Pixel>’ results in a dangling buffer pointer. Is using that dangling pointer across parallel tasks merely working by accident at the moment?

Several questions:

  1. Is the approach above wrong in ways I don't understand?
    1a. Is using the dangling point incorrect?
    1b. Is sharing the dangling pointer across parallel tasks incorrect?
  2. What would be best practice?
  3. Can it be done in a better style without introducing an extra copy into and out of each task?
  4. Can it be done without actor isolation of the shared mutable state?

Indeed, it triggers undefined behaviour, you are lucky it works. Normally withUnsafeMutableBytes is used where bytes are available inside the provided closure.

Will the following alternative version work for you?

@Sendable func compute(rows: ClosedRange<Int>, pixels: UnsafeMutablePointer<Pixel>) async {
    // ...
}

func foo() async {
    var colorMap: [Pixel] = ....
    await bar(&colorMap)
}

func bar(_ pixels: UnsafeMutablePointer<Pixel>) async {
    await withTaskGroup(of: Void.self) { taskGroup in
        for k in 0 ..< numProcessors {
            let firstLine = firstRow + k * numScanLinesPerTask
            let lastLine  = min(firstLine + numScanLinesPerTask - 1, lastRow)
            if lastLine >= firstLine {
                taskGroup.addTask {
                    await compute(rows: firstLine ... lastLine, pixels: pixels)
                }
            }
        }
    }
}

Btw, isn't the task you have at hand better handled with Metal shader?

1 Like

Yes, that compiles without warning and behaves as desired. Does that avoid undefined behavior?

The function can be arbitrarily complicated and is user-defined. While the old code would compile to OpenGL ARB_fragment_programs in limited cases and compiling to Metal is on our list eventually, it is beyond the scope of the immediate porting effort.

Or unlucky: it just means it’ll come back to bite you later.

It is important to understand that Swift’s compiler is not perfect: not everything that compiles is actually correct Swift code. Requirements that cannot be checked at compile-time are documented as preconditions, which the caller must avoid violating.

There are some guardrails, generally: functions with preconditions are expected to actually check them at runtime in case the caller made a mistake, though that check can be skipped with -Ounchecked and you should not rely on it deliberately.

Anything involving the prefix “unsafe” doesn’t check preconditions in -O mode, or possibly ever. Pointers fall into that category: when passed implicitly like that, they are not allowed to escape the scope.

This particular precondition is looking likely to be compile-time checked in the future, actually: there are plans to explicitly mark arguments as @nonescaping.

Once we get reasync or similar, I'm very interested in making all the temporary buffer APIs support async use for reasons basically like this.

7 Likes

Back to the main question:

  1. What is currently the best way to parallelize operations on disjoint slices of an array?
  2. Will the standard library have a parallel map operation?

I think you are looking for DispatchQueue.concurrentPerform(). See this discussion.

2 Likes

Would also like to hear about that!

Meanwhile

What is the final destination of those pixels? I assume OpenGL or Metal. I believe those do not need Array necessarily, raw memory serves them well. Why do you need to have an Array / Array slices in the first place? Looks like UnsafeMutablePointer or (better) UnsafeMutableBufferPointer should be alright for your task.

1 Like

While the example I quoted is generated a bitmap which is used by CoreGraphs later, the same issue arises in many places with different Array element types in different contexts. The commonality is computing f(x,y) or f(x,y,z) across some domain where results are independent. Another critical path case is computing f(x,y,z) -> Double on a cubic lattice then uses for polygonisation generating an implicit surface. I've tried avoiding the Unsafe Pointer types until Instruments indicates the necessity.

Understandable.

BTW, when you access Array element:

var x = array[2]
x += 1
array[2] = x

is it safe to assume that array loads and writes back just that element, without reading (and possibly writing), other elements, say 0...3 ?

pseudo code:
var (a, b, x, d) = array[0 ... 3]
x += 1
array[0 ... 3] = (a, b, x, d)

[Edit: the same question can be asked about "memcpy", "memcmp", etc. - but that's beyond swift.]

[Edit2: is it safe at all to read/modify the same array from different threads without synchronization? Maybe if you are only changing the contents but not the number of elements it is? Is situation the same with dictionary (when you are not changing the keys just the values)?]

[Edit3: if I were to read/modify array contents from different threads without any "unsafe" operations (forget about performance for now) will I have to use a mutex or is it possible to do without? Say, I specifically want threads, without serialising array access on a serial queue.]

1 Like

Partially answering myself on issues raised in Edit2 & Edit3. I'd say it is not safe to use Array from different threads without synchronisation, even under those special circumstances of changing just the contents and not the size. Or even if any access to the array is "read" access. Here is why. The following implementation of Swift Array is not unimaginable:

enum LogAccess {
    case count(result: Int)
    case getter(index: Int, result: WeakHolder)
    case setter(newElement: WeakHolder, oldElement: WeakHolder, index: Int)
    // ... insert, delete, etc
}

struct Array<Element> {
    var realCount: Int { ... }
    func realGet(at index: Index) -> Element { ... }
    mutating func realSet(_ element: Element, at index: Index) { ... }
    
    #if SPECIAL_DEBUG_MODE
    var logAccess: [LogAccess] = []
    #endif
    
    var count: Int {
        let n = realCount
        #if SPECIAL_DEBUG_MODE
        // Edit: bug here. won't be able mutating logAccess from immutable "count".
        logAccess.append(.count(result: n))
        #endif
        return n
    }
    
    subscript(_ index: Int, x: Int) -> Element {
        get {
            let result = realGet(at: index)
            #if SPECIAL_DEBUG_MODE
            // Edit: bug here. won't be able mutating logAccess from immutable "get".
            logAccess.append(.getter(index: index, result: result))
            #endif
            return result
        }
        set {
            let oldElement = realGet(at: index)
            realSet(newValue, at: index)
            #if SPECIAL_DEBUG_MODE
            logAccess.append(.setter(newElement: newValue, oldElement: oldElement, index: index))
            #endif
        }
    }
}

where SPECIAL_DEBUG_MODE is activated e.g. in Xcode diagnostics or as a compilation flag.

This implementation doesn't violate any Array guarantees (e.g. O(1) operations remain so) but it is obvious if you try to call even array.count (or array."anything else") array[x] = y on the same array from two different threads the underlying unsynchronised access to logAccess will crash, trash memory, or misbehave in other ways. To me this is a proof that any mutable Array call (even count) must be synchronised (via mutex, or serial dispatch queue, etc) if array is used in several threads , even if that use is "all readers, no writers". Sounds weird, I know. Would like to be proven wrong on this one, perhaps I'm missing some language guarantees, or "unwritten but obvious rules" here.

Usually @eskimo is very good with these matters, so mentioning him just in case he missed this thread... ;-)

PS. I changed LogAccess-s "Any" to "WeakHolder" (whatever that is) to not introduce strong references to array elements.

PS2. Correcting above: "read access" seems to be fine, as it won't be possible mutating logAccess variable from immutable methods.. unless logAccess is not an instance variable but some static / global variable - but in that case its use will have to be protected internally as it will be used from different arrays (from different threads).

1 Like

Array is a value type so I think mutating it from different threads would break the law of inout memory exclusivity, so definitely wrong. What should be fine is to obtain a pointer to the underlying storage and work on that.

Something like this:

array.withUnsafeMutableBufferPointer { ptr in
    DispatchQueue.concurrentPerform(iterations: ptr.count) { index in
        // assign results to ptr[index]
    }
}
3 Likes
Yep, that's understandable.

The question I have in mind is slightly to the side of the main topic question: whether it is possible to do what OP is asking without any "unsafe" operations (even if this implementation will be slow). And preferably without resorting to reference types like NSArray/NSData, etc, staying in "pure swift". A quick and dirty attempt to avoid data races on array leaded me here:

        var _array: [Element] = ...
        ...
        pthread_mutex_lock(&mutex) // ***
        let result = _array
        pthread_mutex_unlock(&mutex)

just now I am having "Swift access race in App.Test.mutex.modify : __C._opaque_pthread_mutex_t at 0x7b1800023700" on *** line.

I know I can do it with serialQueue.async. Still, is it possible to do it with mutexes?

1 Like

You cannot use the ampersand operator with pthread_mutex_lock() (and generally with low-level locks and atomics). This is a programming error that everybody makes. See this, this and this.

3 Likes

Thank you. Looks like it is not possible to use pthread_mutex without "unsafe" pointers then.

It is possible to use array like type without mentioning "unsafe" but I won't use it in production as it is obviously much slower (10x? 100x?) than versions based on UnsafeMutable|Buffer|Pointer.

A minimal thread safe array wrapper.
Note, here I am converting Array back to reference type.

class MyArray<Element> {
    var array: [Element] = []
    
// version of lock based on NSLock
//    var mutex = NSLock()
//    func lock() {
//        mutex.lock()
//    }
//    func unlock() {
//        mutex.unlock()
//    }

    // version of lock based on semaphore
    let sema = DispatchSemaphore(value: 1)
    func lock() {
        sema.wait()
    }
    func unlock() {
        sema.signal()
    }

    required init(repeating: Element, count: Int) {
        array = [Element](repeating: repeating, count: count)
    }
    var count: Int {
        lock()
        let n = array.count
        unlock()
        return n
    }
    subscript (index: Int) -> Element {
        get {
            lock()
            let result = array[index]
            unlock()
            return result
        }
        set {
            lock()
            array[index] = newValue
            unlock()
        }
    }
}
1 Like