I had the need for a concurrent map recently. I had a part of a program which needed to read chunks of data and concurrently process them and assemble the results in an array. This isn’t necessarily as obvious as it sounds, because of arrays being value types. I came up with the following snippet which I’d like to check for correctness; it could also be helpful to others.

Perhaps this is something Dispatch should provide out-of-the-box?

- Karl

extension DispatchQueue {

  static func concurrentMap<T>(iterations: Int, execute block: (Int) -> T) -> [T] {
    var result = Array<T>()
    result.reserveCapacity(iterations)

    result.withUnsafeMutableBufferPointer { (results: inout UnsafeMutableBufferPointer<T>) in
      concurrentPerform(iterations: iterations) { idx in
        results[idx] = block(idx)
      }
    }

    return result
  }
}

extension Array {
  func concurrentMap<T>(execute block: (Element)->T) -> [T] {
    return DispatchQueue.concurrentMap(iterations: count) { block(self[$0]) }
  }
}

Ah one second, I was refactoring this and forgot to test it. Here’s the actual code:

extension DispatchQueue {

  static func concurrentMap<T>(iterations: Int, execute block: (Int) -> T) -> [T] {

    let __result = UnsafeMutableRawBufferPointer.allocate(count: iterations * MemoryLayout<T>.stride)
    defer { __result.deallocate() }
    let _result = __result.baseAddress?.assumingMemoryBound(to: T.self)
    let result = UnsafeMutableBufferPointer<T>(start: _result, count: iterations)
    concurrentPerform(iterations: iterations) { idx in
      result[idx] = block(idx)
    }
    return Array(result)
  }
}

extension Array {
  func concurrentMap<T>(execute block: (Element)->T) -> [T] {
    return DispatchQueue.concurrentMap(iterations: count) { block(self[$0]) }
  }
}

Unfortunately I don’t think there’s a way to get an array to take over a +1 UnsafeMutableBufferPointer without copying.

- Karl

···

On 30 Oct 2016, at 09:15, Karl <raziel.im+swift-users@gmail.com> wrote:

I had the need for a concurrent map recently. I had a part of a program which needed to read chunks of data and concurrently process them and assemble the results in an array. This isn’t necessarily as obvious as it sounds, because of arrays being value types. I came up with the following snippet which I’d like to check for correctness; it could also be helpful to others.

Perhaps this is something Dispatch should provide out-of-the-box?

- Karl

I had the need for a concurrent map recently. I had a part of a
program which needed to read chunks of data and concurrently process
them and assemble the results in an array. This isn’t necessarily as
obvious as it sounds, because of arrays being value types. I came up
with the following snippet which I’d like to check for correctness;
it could also be helpful to others.

Perhaps this is something Dispatch should provide out-of-the-box?

- Karl

Ah one second, I was refactoring this and forgot to test it. Here’s the actual code:

A map presumably requires an input

extension DispatchQueue {

  static func concurrentMap<T>(iterations: Int, execute block: (Int) -> T) -> [T] {

    let __result = UnsafeMutableRawBufferPointer.allocate(count: iterations * MemoryLayout<T>.stride)
    defer { __result.deallocate() }
    let _result = __result.baseAddress?.assumingMemoryBound(to: T.self)

You never bound the memory to T, so this will be undefined behavior.

    let result = UnsafeMutableBufferPointer<T>(start: _result, count: iterations)
    concurrentPerform(iterations: iterations) { idx in
      result[idx] = block(idx)

You also never initialized the Ts in that memory region, so assigning
into them will also be undefined behavior.

    }
    return Array(result)
  }
}

extension Array {
  func concurrentMap<T>(execute block: (Element)->T) -> [T] {
    return DispatchQueue.concurrentMap(iterations: count) { block(self[$0]) }
  }
}

Unfortunately I don’t think there’s a way to get an array to take over a +1
UnsafeMutableBufferPointer without copying.

The only correct way to do this without creating intermediate storage is
to have a way to initialize your result elements, e.g.:

  import Dispatch

  protocol DefaultInitializable {
    init()
  }

  extension RandomAccessCollection {
    func concurrentMap<T>(_ transform: (Iterator.Element)->T) -> [T]
    where T : DefaultInitializable {
      var result = Array(
        repeating: T(), count: numericCast(self.count))

      DispatchQueue.concurrentPerform(iterations: result.count) {
        offset in
        result[offset] = transform(
          self[index(startIndex, offsetBy: numericCast(offset))])
      }
      return result
    }
  }

  extension Int : DefaultInitializable { }

  print((3..<20).concurrentMap { $0 * 2 })

If you don't want the DefaultInitializable requirement (or some other
way to prepare initialized elements), you'll need to manage memory
yourself:

  extension RandomAccessCollection {
    func concurrentMap<T>(_ transform: (Iterator.Element)->T) -> [T] {
      let n = numericCast(self.count) as Int
      let p = UnsafeMutablePointer<T>.allocate(capacity: n)
      defer { p.deallocate(capacity: n) }

      DispatchQueue.concurrentPerform(iterations: n) {
        offset in
        (p + offset).initialize(
            self[index(startIndex, offsetBy: numericCast(offset))]))
      }

      return Array(UnsafeMutableBufferPointer(start: p, count: n))
    }
  }

This posting highlights a couple of weaknesses in the standard library
for which I'd appreciate bug reports:

1. No way to arbitrarily initialize an Array's storage.
2. UnsafeMutableBufferPointer doesn't have an allocating init

Thanks!

···

on Sun Oct 30 2016, Karl <swift-users-AT-swift.org> wrote:

On 30 Oct 2016, at 09:15, Karl <raziel.im+swift-users@gmail.com> wrote:

          to: transform(

--
-Dave

I had the need for a concurrent map recently. I had a part of a
program which needed to read chunks of data and concurrently process
them and assemble the results in an array. This isn’t necessarily as
obvious as it sounds, because of arrays being value types. I came up
with the following snippet which I’d like to check for correctness;
it could also be helpful to others.

Perhaps this is something Dispatch should provide out-of-the-box?

- Karl

Ah one second, I was refactoring this and forgot to test it. Here’s the actual code:

A map presumably requires an input

DispatchQueue.concurrentMap maps a Range<Int> -> T, but since the range is always 0..<n, we only ask for the value of n. It could also be written quite naturally as an extension on Range and build everything on top of it.

extension DispatchQueue {

static func concurrentMap<T>(iterations: Int, execute block: (Int) -> T) -> [T] {

   let __result = UnsafeMutableRawBufferPointer.allocate(count: iterations * MemoryLayout<T>.stride)
   defer { __result.deallocate() }
   let _result = __result.baseAddress?.assumingMemoryBound(to: T.self)

You never bound the memory to T, so this will be undefined behavior.

   let result = UnsafeMutableBufferPointer<T>(start: _result, count: iterations)
   concurrentPerform(iterations: iterations) { idx in
     result[idx] = block(idx)

You also never initialized the Ts in that memory region, so assigning
into them will also be undefined behavior.

   }
   return Array(result)
}
}

extension Array {
func concurrentMap<T>(execute block: (Element)->T) -> [T] {
   return DispatchQueue.concurrentMap(iterations: count) { block(self[$0]) }
}
}

Unfortunately I don’t think there’s a way to get an array to take over a +1
UnsafeMutableBufferPointer without copying.

The only correct way to do this without creating intermediate storage is
to have a way to initialize your result elements, e.g.:

import Dispatch

protocol DefaultInitializable {
   init()
}

extension RandomAccessCollection {
   func concurrentMap<T>(_ transform: (Iterator.Element)->T) -> [T]
   where T : DefaultInitializable {
     var result = Array(
       repeating: T(), count: numericCast(self.count))

     DispatchQueue.concurrentPerform(iterations: result.count) {
       offset in
       result[offset] = transform(
         self[index(startIndex, offsetBy: numericCast(offset))])
     }
     return result
   }
}

extension Int : DefaultInitializable { }

print((3..<20).concurrentMap { $0 * 2 })

I had a go at doing that before, using Optional<T> and unwrapping at the end — but it occurred to me that it would be very inefficient for things like Optional<Int>, and introduces more allocations.

If you don't want the DefaultInitializable requirement (or some other
way to prepare initialized elements), you'll need to manage memory
yourself:

extension RandomAccessCollection {
   func concurrentMap<T>(_ transform: (Iterator.Element)->T) -> [T] {
     let n = numericCast(self.count) as Int
     let p = UnsafeMutablePointer<T>.allocate(capacity: n)
     defer { p.deallocate(capacity: n) }

     DispatchQueue.concurrentPerform(iterations: n) {
       offset in
       (p + offset).initialize(
         to: transform(
           self[index(startIndex, offsetBy: numericCast(offset))]))
     }

     return Array(UnsafeMutableBufferPointer(start: p, count: n))
   }
}

This posting highlights a couple of weaknesses in the standard library
for which I'd appreciate bug reports:

1. No way to arbitrarily initialize an Array's storage.
2. UnsafeMutableBufferPointer doesn't have an allocating init

Thanks!

--
-Dave

_______________________________________________
swift-users mailing list
swift-users@swift.org
https://lists.swift.org/mailman/listinfo/swift-users

Filed:

1. [SR-3087] No way to arbitrarily initialise an Array's storage · Issue #45677 · apple/swift · GitHub
2. [SR-3088] UnsafeMutableBufferPointer doesn't have an allocating init · Issue #45678 · apple/swift · GitHub

What is your opinion on the corelibs extending the standard library types? Foundation does it to provide APIs from NSString, but it’s kind of a special case. Would it be reasonable for Dispatch (which is not _such_ a special case) to also extend types like Range and Collection?

I quite like the API as an extension on Range. I think it would be a nice addition to Dispatch (once we start allowing additive proposals):

extension Range where Bound : Strideable, Bound.Stride : SignedInteger {

  func concurrentMap<T>(_ transform: (Bound) -> T) -> [T] {
    let n = numericCast(count) as Int
    let buffer = UnsafeMutablePointer<T>.allocate(capacity: n)

    DispatchQueue.concurrentPerform(iterations: n) {
      (buffer + $0).initialize(to: transform(lowerBound + numericCast($0)))
    }

    // Unfortunately, the buffer is copied when making it an Array<T>.
    defer { buffer.deallocate(capacity: n) }
    return Array(UnsafeMutableBufferPointer<T>(start: buffer, count: n))
  }
}

extension Collection {
  func concurrentMap<T>(_ transform: (Iterator.Element)->T) -> [T] {

    // ‘as Range’ because CountableRange is a collection, causing the function to be recursive.
    return ((0..<numericCast(count)) as Range).concurrentMap {
      transform(self[index(startIndex, offsetBy: numericCast($0))])
    }
  }
}

Thanks

- Karl

···

On 30 Oct 2016, at 19:23, Dave Abrahams via swift-users <swift-users@swift.org> wrote:
on Sun Oct 30 2016, Karl <swift-users-AT-swift.org> wrote:

On 30 Oct 2016, at 09:15, Karl <raziel.im+swift-users@gmail.com> wrote:

I had the need for a concurrent map recently. I had a part of a
program which needed to read chunks of data and concurrently process
them and assemble the results in an array. This isn’t necessarily as
obvious as it sounds, because of arrays being value types. I came up
with the following snippet which I’d like to check for correctness;
it could also be helpful to others.

Perhaps this is something Dispatch should provide out-of-the-box?

- Karl

Ah one second, I was refactoring this and forgot to test it. Here’s the actual code:

A map presumably requires an input

DispatchQueue.concurrentMap maps a Range<Int> -> T, but since the
range is always 0..<n, we only ask for the value of n. It could also
be written quite naturally as an extension on Range and build
everything on top of it.

Sorry, I wrote “a map presumably requires an input” before I realized
what you were doing. I should have deleted that.

extension DispatchQueue {

static func concurrentMap<T>(iterations: Int, execute block: (Int) -> T) -> [T] {

   let __result = UnsafeMutableRawBufferPointer.allocate(count: iterations * MemoryLayout<T>.stride)
   defer { __result.deallocate() }
   let _result = __result.baseAddress?.assumingMemoryBound(to: T.self)

You never bound the memory to T, so this will be undefined behavior.

   let result = UnsafeMutableBufferPointer<T>(start: _result, count: iterations)
   concurrentPerform(iterations: iterations) { idx in
     result[idx] = block(idx)

You also never initialized the Ts in that memory region, so assigning
into them will also be undefined behavior.

   }
   return Array(result)
}
}

extension Array {
func concurrentMap<T>(execute block: (Element)->T) -> [T] {
   return DispatchQueue.concurrentMap(iterations: count) { block(self[$0]) }
}
}

Unfortunately I don’t think there’s a way to get an array to take over a +1
UnsafeMutableBufferPointer without copying.

The only correct way to do this without creating intermediate storage is
to have a way to initialize your result elements, e.g.:

import Dispatch

protocol DefaultInitializable {
   init()
}

extension RandomAccessCollection {
   func concurrentMap<T>(_ transform: (Iterator.Element)->T) -> [T]
   where T : DefaultInitializable {
     var result = Array(
       repeating: T(), count: numericCast(self.count))

     DispatchQueue.concurrentPerform(iterations: result.count) {
       offset in
       result[offset] = transform(
         self[index(startIndex, offsetBy: numericCast(offset))])
     }
     return result
   }
}

extension Int : DefaultInitializable { }

print((3..<20).concurrentMap { $0 * 2 })

I had a go at doing that before, using Optional<T> and unwrapping at
the end — but it occurred to me that it would be very inefficient for
things like Optional<Int>, and introduces more allocations.

Yeah, optional is not really a good choice for that application.

If you don't want the DefaultInitializable requirement (or some other
way to prepare initialized elements), you'll need to manage memory
yourself:

extension RandomAccessCollection {
   func concurrentMap<T>(_ transform: (Iterator.Element)->T) -> [T] {
     let n = numericCast(self.count) as Int
     let p = UnsafeMutablePointer<T>.allocate(capacity: n)
     defer { p.deallocate(capacity: n) }

     DispatchQueue.concurrentPerform(iterations: n) {
       offset in
       (p + offset).initialize(
         to: transform(
           self[index(startIndex, offsetBy: numericCast(offset))]))
     }

     return Array(UnsafeMutableBufferPointer(start: p, count: n))
   }
}

This posting highlights a couple of weaknesses in the standard library
for which I'd appreciate bug reports:

1. No way to arbitrarily initialize an Array's storage.
2. UnsafeMutableBufferPointer doesn't have an allocating init

Filed:

1. [SR-3087] No way to arbitrarily initialise an Array's storage · Issue #45677 · apple/swift · GitHub
2. [SR-3088] UnsafeMutableBufferPointer doesn't have an allocating init · Issue #45678 · apple/swift · GitHub

Thanks for these!

What is your opinion on the corelibs extending the standard library
types?
Foundation does it to provide APIs from NSString, but it’s kind of a
special case.

My opinion is that, *as a rule*, frameworks should avoid extending APIs
from other frameworks; it makes it very hard for the author of the
original type to manage the user experience of her type. But there are
exceptions to every rule ;-)

Would it be reasonable for Dispatch (which is not _such_ a special
case) to also extend types like Range and Collection?

Oh, well now *Collection* is not a type (despite how it's defined in the
language); it's a protocol. Extending Collection is the best way to
write generic algorithms over all Collections, so I support that. But
it does have to be done very judiciously, because you have to realize
the API you are providing is going to appear on everything that conforms
to Collection.

I quite like the API as an extension on Range. I think it would be a
nice addition to Dispatch (once we start allowing additive proposals):

extension Range where Bound : Strideable, Bound.Stride : SignedInteger {

  func concurrentMap<T>(_ transform: (Bound) -> T) -> [T] {
    let n = numericCast(count) as Int
    let buffer = UnsafeMutablePointer<T>.allocate(capacity: n)

    DispatchQueue.concurrentPerform(iterations: n) {
      (buffer + $0).initialize(to: transform(lowerBound + numericCast($0)))
    }

    // Unfortunately, the buffer is copied when making it an Array<T>.
    defer { buffer.deallocate(capacity: n) }
    return Array(UnsafeMutableBufferPointer<T>(start: buffer, count: n))
  }
}

extension Collection {
  func concurrentMap<T>(_ transform: (Iterator.Element)->T) -> [T] {

    // ‘as Range’ because CountableRange is a collection, causing the function to be recursive.
    return ((0..<numericCast(count)) as Range).concurrentMap {
      transform(self[index(startIndex, offsetBy: numericCast($0))])
    }
  }
}

I see the beauty in what you're doing here, but I don't see any
advantage to it for users. Now Range (which will be collapsed with
CountableRange in Swift 4) will have two overloads of concurrentMap. In
general, avoidable overloads are bad for the user experience.

HTH,

···

on Sun Oct 30 2016, Karl <razielim-AT-gmail.com> wrote:

On 30 Oct 2016, at 19:23, Dave Abrahams via swift-users <swift-users@swift.org> wrote:
on Sun Oct 30 2016, Karl <swift-users-AT-swift.org> wrote:

On 30 Oct 2016, at 09:15, Karl <raziel.im+swift-users@gmail.com> wrote:

--
-Dave

Oh, and I should add, a suite of parallel algorithms would be great, but
it should be implemented similarly to lazy, so instead of

   x.concurrentMap { ... }.concurrentFilter { ... }

you'd write

   x.parallel.map { ... }.filter { ... }

Cheers,

···

on Sun Oct 30 2016, Dave Abrahams <swift-users-AT-swift.org> wrote:

I quite like the API as an extension on Range. I think it would be a
nice addition to Dispatch (once we start allowing additive proposals):

extension Range where Bound : Strideable, Bound.Stride : SignedInteger {

  func concurrentMap<T>(_ transform: (Bound) -> T) -> [T] {
    let n = numericCast(count) as Int
    let buffer = UnsafeMutablePointer<T>.allocate(capacity: n)

    DispatchQueue.concurrentPerform(iterations: n) {
      (buffer + $0).initialize(to: transform(lowerBound + numericCast($0)))
    }

    // Unfortunately, the buffer is copied when making it an Array<T>.
    defer { buffer.deallocate(capacity: n) }
    return Array(UnsafeMutableBufferPointer<T>(start: buffer, count: n))
  }
}

extension Collection {
  func concurrentMap<T>(_ transform: (Iterator.Element)->T) -> [T] {

    // ‘as Range’ because CountableRange is a collection, causing the function to be recursive.
    return ((0..<numericCast(count)) as Range).concurrentMap {
      transform(self[index(startIndex, offsetBy: numericCast($0))])
    }
  }
}

I see the beauty in what you're doing here, but I don't see any
advantage to it for users. Now Range (which will be collapsed with
CountableRange in Swift 4) will have two overloads of concurrentMap. In
general, avoidable overloads are bad for the user experience.

--
-Dave

Notwithstanding your comment about `collection.parallel.map { }.filter {}`, the issue here is with the literal being interpreted as CountableRange. That can be really annoying in general, and I’m glad the different Range types are going away.

If Range conditionally conforms to Collection (say, when its Bound is Strideable), then `(0..<5).concurrentMap {}` should still call the function on Range because it’s “more specialised”, isn't that correct?

Thanks

- Karl

···

On 31 Oct 2016, at 05:15, Dave Abrahams <dabrahams@apple.com> wrote:

on Sun Oct 30 2016, Karl <razielim-AT-gmail.com <http://razielim-at-gmail.com/&gt;&gt; wrote:

On 30 Oct 2016, at 19:23, Dave Abrahams via swift-users <swift-users@swift.org> wrote:

on Sun Oct 30 2016, Karl <swift-users-AT-swift.org> wrote:

On 30 Oct 2016, at 09:15, Karl <raziel.im+swift-users@gmail.com> wrote:

I had the need for a concurrent map recently. I had a part of a
program which needed to read chunks of data and concurrently process
them and assemble the results in an array. This isn’t necessarily as
obvious as it sounds, because of arrays being value types. I came up
with the following snippet which I’d like to check for correctness;
it could also be helpful to others.

Perhaps this is something Dispatch should provide out-of-the-box?

- Karl

Ah one second, I was refactoring this and forgot to test it. Here’s the actual code:

A map presumably requires an input

DispatchQueue.concurrentMap maps a Range<Int> -> T, but since the
range is always 0..<n, we only ask for the value of n. It could also
be written quite naturally as an extension on Range and build
everything on top of it.

Sorry, I wrote “a map presumably requires an input” before I realized
what you were doing. I should have deleted that.

extension DispatchQueue {

static func concurrentMap<T>(iterations: Int, execute block: (Int) -> T) -> [T] {

  let __result = UnsafeMutableRawBufferPointer.allocate(count: iterations * MemoryLayout<T>.stride)
  defer { __result.deallocate() }
  let _result = __result.baseAddress?.assumingMemoryBound(to: T.self)

You never bound the memory to T, so this will be undefined behavior.

  let result = UnsafeMutableBufferPointer<T>(start: _result, count: iterations)
  concurrentPerform(iterations: iterations) { idx in
    result[idx] = block(idx)

You also never initialized the Ts in that memory region, so assigning
into them will also be undefined behavior.

  }
  return Array(result)
}
}

extension Array {
func concurrentMap<T>(execute block: (Element)->T) -> [T] {
  return DispatchQueue.concurrentMap(iterations: count) { block(self[$0]) }
}
}

Unfortunately I don’t think there’s a way to get an array to take over a +1
UnsafeMutableBufferPointer without copying.

The only correct way to do this without creating intermediate storage is
to have a way to initialize your result elements, e.g.:

import Dispatch

protocol DefaultInitializable {
  init()
}

extension RandomAccessCollection {
  func concurrentMap<T>(_ transform: (Iterator.Element)->T) -> [T]
  where T : DefaultInitializable {
    var result = Array(
      repeating: T(), count: numericCast(self.count))

    DispatchQueue.concurrentPerform(iterations: result.count) {
      offset in
      result[offset] = transform(
        self[index(startIndex, offsetBy: numericCast(offset))])
    }
    return result
  }
}

extension Int : DefaultInitializable { }

print((3..<20).concurrentMap { $0 * 2 })

I had a go at doing that before, using Optional<T> and unwrapping at
the end — but it occurred to me that it would be very inefficient for
things like Optional<Int>, and introduces more allocations.

Yeah, optional is not really a good choice for that application.

If you don't want the DefaultInitializable requirement (or some other
way to prepare initialized elements), you'll need to manage memory
yourself:

extension RandomAccessCollection {
  func concurrentMap<T>(_ transform: (Iterator.Element)->T) -> [T] {
    let n = numericCast(self.count) as Int
    let p = UnsafeMutablePointer<T>.allocate(capacity: n)
    defer { p.deallocate(capacity: n) }

    DispatchQueue.concurrentPerform(iterations: n) {
      offset in
      (p + offset).initialize(
        to: transform(
          self[index(startIndex, offsetBy: numericCast(offset))]))
    }

    return Array(UnsafeMutableBufferPointer(start: p, count: n))
  }
}

This posting highlights a couple of weaknesses in the standard library
for which I'd appreciate bug reports:

1. No way to arbitrarily initialize an Array's storage.
2. UnsafeMutableBufferPointer doesn't have an allocating init

Filed:

1. [SR-3087] No way to arbitrarily initialise an Array's storage · Issue #45677 · apple/swift · GitHub
2. [SR-3088] UnsafeMutableBufferPointer doesn't have an allocating init · Issue #45678 · apple/swift · GitHub

Thanks for these!

What is your opinion on the corelibs extending the standard library
types?
Foundation does it to provide APIs from NSString, but it’s kind of a
special case.

My opinion is that, *as a rule*, frameworks should avoid extending APIs
from other frameworks; it makes it very hard for the author of the
original type to manage the user experience of her type. But there are
exceptions to every rule ;-)

Would it be reasonable for Dispatch (which is not _such_ a special
case) to also extend types like Range and Collection?

Oh, well now *Collection* is not a type (despite how it's defined in the
language); it's a protocol. Extending Collection is the best way to
write generic algorithms over all Collections, so I support that. But
it does have to be done very judiciously, because you have to realize
the API you are providing is going to appear on everything that conforms
to Collection.

I quite like the API as an extension on Range. I think it would be a
nice addition to Dispatch (once we start allowing additive proposals):

extension Range where Bound : Strideable, Bound.Stride : SignedInteger {

func concurrentMap<T>(_ transform: (Bound) -> T) -> [T] {
   let n = numericCast(count) as Int
   let buffer = UnsafeMutablePointer<T>.allocate(capacity: n)

   DispatchQueue.concurrentPerform(iterations: n) {
     (buffer + $0).initialize(to: transform(lowerBound + numericCast($0)))
   }

   // Unfortunately, the buffer is copied when making it an Array<T>.
   defer { buffer.deallocate(capacity: n) }
   return Array(UnsafeMutableBufferPointer<T>(start: buffer, count: n))
}
}

extension Collection {
func concurrentMap<T>(_ transform: (Iterator.Element)->T) -> [T] {

   // ‘as Range’ because CountableRange is a collection, causing the function to be recursive.
   return ((0..<numericCast(count)) as Range).concurrentMap {
     transform(self[index(startIndex, offsetBy: numericCast($0))])
   }
}
}

I see the beauty in what you're doing here, but I don't see any
advantage to it for users. Now Range (which will be collapsed with
CountableRange in Swift 4) will have two overloads of concurrentMap. In
general, avoidable overloads are bad for the user experience.

HTH,

--
-Dave

I had the need for a concurrent map recently. I had a part of a
program which needed to read chunks of data and concurrently process
them and assemble the results in an array. This isn’t necessarily as
obvious as it sounds, because of arrays being value types. I came up
with the following snippet which I’d like to check for correctness;
it could also be helpful to others.

Perhaps this is something Dispatch should provide out-of-the-box?

- Karl

Ah one second, I was refactoring this and forgot to test it. Here’s the actual code:

A map presumably requires an input

DispatchQueue.concurrentMap maps a Range<Int> -> T, but since the
range is always 0..<n, we only ask for the value of n. It could also
be written quite naturally as an extension on Range and build
everything on top of it.

Sorry, I wrote “a map presumably requires an input” before I realized
what you were doing. I should have deleted that.

extension DispatchQueue {

static func concurrentMap<T>(iterations: Int, execute block: (Int) -> T) -> [T] {

  let __result = UnsafeMutableRawBufferPointer.allocate(count: iterations * MemoryLayout<T>.stride)
  defer { __result.deallocate() }
  let _result = __result.baseAddress?.assumingMemoryBound(to: T.self)

You never bound the memory to T, so this will be undefined behavior.

  let result = UnsafeMutableBufferPointer<T>(start: _result, count: iterations)
  concurrentPerform(iterations: iterations) { idx in
    result[idx] = block(idx)

You also never initialized the Ts in that memory region, so assigning
into them will also be undefined behavior.

  }
  return Array(result)
}
}

extension Array {
func concurrentMap<T>(execute block: (Element)->T) -> [T] {
  return DispatchQueue.concurrentMap(iterations: count) { block(self[$0]) }
}
}

Unfortunately I don’t think there’s a way to get an array to take over a +1
UnsafeMutableBufferPointer without copying.

The only correct way to do this without creating intermediate storage is
to have a way to initialize your result elements, e.g.:

import Dispatch

protocol DefaultInitializable {
  init()
}

extension RandomAccessCollection {
  func concurrentMap<T>(_ transform: (Iterator.Element)->T) -> [T]
  where T : DefaultInitializable {
    var result = Array(
      repeating: T(), count: numericCast(self.count))

    DispatchQueue.concurrentPerform(iterations: result.count) {
      offset in
      result[offset] = transform(
        self[index(startIndex, offsetBy: numericCast(offset))])
    }
    return result
  }
}

extension Int : DefaultInitializable { }

print((3..<20).concurrentMap { $0 * 2 })

I had a go at doing that before, using Optional<T> and unwrapping at
the end — but it occurred to me that it would be very inefficient for
things like Optional<Int>, and introduces more allocations.

Yeah, optional is not really a good choice for that application.

If you don't want the DefaultInitializable requirement (or some other
way to prepare initialized elements), you'll need to manage memory
yourself:

extension RandomAccessCollection {
  func concurrentMap<T>(_ transform: (Iterator.Element)->T) -> [T] {
    let n = numericCast(self.count) as Int
    let p = UnsafeMutablePointer<T>.allocate(capacity: n)
    defer { p.deallocate(capacity: n) }

    DispatchQueue.concurrentPerform(iterations: n) {
      offset in
      (p + offset).initialize(
        to: transform(
          self[index(startIndex, offsetBy: numericCast(offset))]))
    }

    return Array(UnsafeMutableBufferPointer(start: p, count: n))
  }
}

This posting highlights a couple of weaknesses in the standard library
for which I'd appreciate bug reports:

1. No way to arbitrarily initialize an Array's storage.
2. UnsafeMutableBufferPointer doesn't have an allocating init

Filed:

1. [SR-3087] No way to arbitrarily initialise an Array's storage · Issue #45677 · apple/swift · GitHub
2. [SR-3088] UnsafeMutableBufferPointer doesn't have an allocating init · Issue #45678 · apple/swift · GitHub

Thanks for these!

What is your opinion on the corelibs extending the standard library
types?
Foundation does it to provide APIs from NSString, but it’s kind of a
special case.

My opinion is that, *as a rule*, frameworks should avoid extending APIs
from other frameworks; it makes it very hard for the author of the
original type to manage the user experience of her type. But there are
exceptions to every rule ;-)

Would it be reasonable for Dispatch (which is not _such_ a special
case) to also extend types like Range and Collection?

Oh, well now *Collection* is not a type (despite how it's defined in the
language); it's a protocol. Extending Collection is the best way to
write generic algorithms over all Collections, so I support that. But
it does have to be done very judiciously, because you have to realize
the API you are providing is going to appear on everything that conforms
to Collection.

I quite like the API as an extension on Range. I think it would be a
nice addition to Dispatch (once we start allowing additive proposals):

extension Range where Bound : Strideable, Bound.Stride : SignedInteger {

func concurrentMap<T>(_ transform: (Bound) -> T) -> [T] {
   let n = numericCast(count) as Int
   let buffer = UnsafeMutablePointer<T>.allocate(capacity: n)

   DispatchQueue.concurrentPerform(iterations: n) {
     (buffer + $0).initialize(to: transform(lowerBound + numericCast($0)))
   }

   // Unfortunately, the buffer is copied when making it an Array<T>.
   defer { buffer.deallocate(capacity: n) }
   return Array(UnsafeMutableBufferPointer<T>(start: buffer, count: n))
}
}

extension Collection {
func concurrentMap<T>(_ transform: (Iterator.Element)->T) -> [T] {

   // ‘as Range’ because CountableRange is a collection, causing the function to be recursive.
   return ((0..<numericCast(count)) as Range).concurrentMap {
     transform(self[index(startIndex, offsetBy: numericCast($0))])
   }
}
}

I see the beauty in what you're doing here, but I don't see any
advantage to it for users. Now Range (which will be collapsed with
CountableRange in Swift 4) will have two overloads of concurrentMap. In
general, avoidable overloads are bad for the user experience.

HTH,

--
-Dave

Notwithstanding your comment about `collection.parallel.map { }.filter
{}`, the issue here is with the literal being interpreted as
CountableRange.

Not to be blunt, but I don't see how that's the central issue at all.

That can be really annoying in general, and I’m glad the different
Range types are going away.

Me too. Give me the compiler features I need, and I'll give you
beautiful, pleasurable APIs.

If Range conditionally conforms to Collection (say, when its Bound is
Strideable), then `(0..<5).concurrentMap {}` should still call the
function on Range because it’s “more specialised”, isn't that correct?

Yes, but it wouldn't prevent the user from seeing two overloads for
concurrentMap.

···

on Mon Oct 31 2016, Karl <razielim-AT-gmail.com> wrote:

On 31 Oct 2016, at 05:15, Dave Abrahams <dabrahams@apple.com> wrote:
on Sun Oct 30 2016, Karl <razielim-AT-gmail.com <http://razielim-at-gmail.com/&gt;&gt; wrote:

On 30 Oct 2016, at 19:23, Dave Abrahams via swift-users <swift-users@swift.org> wrote:
on Sun Oct 30 2016, Karl <swift-users-AT-swift.org> wrote:

On 30 Oct 2016, at 09:15, Karl <raziel.im+swift-users@gmail.com> wrote:

--
-Dave

Is the batching really required? The documentation says:

Parameters

iterations

The number of times to execute the block. Higher iteration values give the system the ability to balance more efficiently across multiple cores.

So it would seem that Dispatch prefers to know the total count and will divvy those tasks optimally by itself.

The optimal batch size for concurrent map operations is notoriously difficult to get right. There's some work that has to be done per-item, and some overhead that has to be done per-batch. You extract more concurrency and better balance resource utilization by having more work items, but that increases the overhead.

If we have unbounded execution resources, do not know the actual per-item and per-batch costs, and are only trying to minimize wall-time t, all we can say is that the optimal choice of batch size is O(sqrt(n)), with constant proportional to the ratio of the costs. Obviously that's a very contrived situation, though. We never have unbounded execution resources, and we never only care about minimizing wall-time t for our operation in isolation--we want to make the whole computational system more efficient. Both of these deviations from the "spherical cow" lead to an optimal number of batches of the form min(c₁, c₂n, c₃sqrt(n)).

So it's a big mess, it's very hard to say in general what the batch size should be, but a work-item per element is never the right choice except when you're doing a huge amount of work for each and there are very few elements.


Separately, Dave, it looks like your sketch there uses batches of size b, b, ..., b, b, n%b. When the number of batches is large, this works OK, but in the limit of just a few batches, it leads to very uneven distribution of work. We found it better in Accelerate to allocate work as b+c, ... b+c, b, ... b, where c is often 1, but is sometimes chosen to align work groups to cache lines, etc (this only really matters when the amount of work being done per work item is small).

1 Like

That's only going to be good if the workload you have to perform for each element is really heavy.
I had to do it this way because when dispatch chooses the batch size, it still calls your closure once for each element and doesn't tell you the batch size, so there's no way to avoid synchronizing on every element like the first implementation does, which is horrible unless the per-element workload is large.

I've updated the gist and if you read the comment below you can see the results.

What we really need from Dispatch is for it to choose the batch size, and call our closure with the batch index and size, but it doesn't have that API.

1 Like

It doesn't actually; the name batchSize is probably ill-chosen. Think of it as batchSizeOrOneLess.

That is interesting! I'm done tuning this casual experiment but if I were to try to eke out the last bit of perf, I'd definitely try that.

1 Like
1 Like

Unintentionally deleted this post. It said:

As a follow-up, for posterity, this gist's concurrentMap2 is a pretty good implementation that takes advantage of a newer initializer, to avoid this problem

1 Like

One problem I've encountered with DispatchQueue.concurrentPerform(iterations: count) is that it utilise all CPU cores at 100% rate. While it can be suitable for some situations, there are undesirable consequences for overall app performance. While it is sometimes ok to utilise all cores for very short period of time, it is often undesirable to freeze the whole app for several seconds. Having a situation where amount of data is potentially unknown, such solution can freeze the app for a long period of time.
Max number of used CPU cores is often needed to be limited, and there are single core systems like WatchOS environment where utilisation of the only single core is needed to be limited.
So another solution is to separate data to batches and perform each batch operation using Dispatch.async or await Task {} while limiting the number of operations performed at the same time concurrently.
I have never met a general solution suitable for the most cases, and every time I needed to do something similar I ended up with individual solution with careful tuning, profiling and debugging.

I confirm this finding, DispatchQueue.concurrentPerform is quite dangerous call IRT totally seizing CPU.

BTW, what the heck is this, enbolded:

"Target queue" ?! For a class function?!


This is my attempt of a safer (and more async friendly) version of concurrentPerform:

extension DispatchQueue {
    class func saferConcurrentPerform(iterations: Int, execute: @escaping (Int) -> Void, completion: @escaping () -> Void) {
        guard iterations > 0 else {
            completion()
            return
        }
        let numberOfCores = 8 // TODO: get this properly
        let queueCount = max(0, min(iterations, numberOfCores - 2))
        let queues = (0 ..< queueCount).map { i in
            DispatchQueue(label: "queue-\(i)")
        }
        var completedCount = 0
        
        func queueFinished() {
            dispatchPrecondition(condition: .onQueue(queues[0]))
            completedCount += 1
            if completedCount == queueCount { completion() }
        }
        
        let iterationsPerQueue = (iterations + queueCount - 1) / queueCount
        var from = 0
        
        for queueIndex in 0 ..< queueCount {
            let to = min(iterations, from + iterationsPerQueue)
            queues[queueIndex].async { [to, from] in
                for j in from ..< to {
                    execute(j)
                }
                if queueIndex == 0 {
                    queueFinished()
                } else {
                    queues[0].async {
                        queueFinished()
                    }
                }
            }
            from = to
        }
    }
    class func saferConcurrentPerform(iterations: Int, execute: @escaping (Int) -> Void) async {
        await withCheckedContinuation { continuation in
            saferConcurrentPerform(iterations: iterations, execute: execute) {
                continuation.resume(returning: ())
            }
        }
    }
}

This could be used without a fear of seizing the CPU completely.

I had to put the internal stuff within DispatchQueue.global().async { ... } as otherwise it was blocking the current queue for a significant amount of time... surprisingly queue.async is not returning "immediately" after queuing the work, as I expected.

† - a better version would count the number of tasks submitted per queue and submit the work on the least busy queue instead of submitting it on the next queue (as per the round robin algorithm).

Edit: changed implementation, now the queueing is "instantaneous". Got rid of the lock, it was unnecessary.

It could also make use of Sendable to ensure the elements do not contain unsynchronised shared mutable state, and transform closure is safe to call from multiple threads (even if it uses Dispatch rather than Swift Concurrency's native Tasks, I think Sendable would still be appropriate).

This was actually mentioned as an example in the proposal which introduced Sendable:

While the Sendable protocol directly addresses value types and allows classes to opt-in to participation with the concurrency system, function types are also important reference types that cannot currently conform to protocols. Functions in Swift occur in several forms, including global func declarations, nested functions, accessors (getters, setters, subscripts, etc), and closures. It is useful and important to allow functions to be passed across concurrency domains where possible to allow higher order functional programming techniques in the Swift Concurrency model, for example to allow definition of parallelMap and other obvious concurrency constructs.

New @Sendable attribute for functions

I know this is a very old thread and the gist you linked to is also many years old, but since you mentioned new APIs which allow for better performance, I thought I'd also take the opportunity to mention the new APIs/language features which provide enhanced safety.

The gist had to rely on documentation and developer vigilance:

/// - Requires: transform is safe to call from multiple threads.

this seems like something inherited from the docs for dispatch_apply that doesn't make as much sense in this context. reported a suggestion to improve this here.

this presumably is not what you were going for to upper bound core usage, since a large iteration count will 'win'. should this instead be something like max(1, min(iterations, numberOfCores - 2))?

perhaps this thread is not the place to elaborate on this, but if this is true it would seemingly imply something is very broken with my mental model of Dispatch. i would personally be curious how you encountered this behavior (i did not manage to reproduce it when testing your code snippets).

1 Like

Thank you! Fixed the code above (also added some log statements to see how long it takes, see below).

Or that's Dispatch limitation... Try this fragment:

DispatchQueue.saferConcurrentPerform(iterations: 1000) { _ in
    usleep(1000_000_000)
} completion: {
}

With 1000 iterations it's quick (a millisecond). But if you try it with iteration count between 10K and 100K you'd see that it takes too long for the main thread to cope with (0.1 sec for 10K ... 12 sec for 100K) and FTM the complexity is not quite linear:

i tested this locally and similarly saw superlinear growth. IMO it appears likely to be an artifact of the debugging configuration to enable 'backtrace recording' for GCD queues. disabling that setting (as described here) dramatically improved the performance of large iteration counts.

2 Likes