I had the need for a concurrent map recently. I had a part of a
program which needed to read chunks of data and concurrently process
them and assemble the results in an array. This isn’t necessarily as
obvious as it sounds, because of arrays being value types. I came up
with the following snippet which I’d like to check for correctness;
it could also be helpful to others.
Perhaps this is something Dispatch should provide out-of-the-box?
- Karl
Ah one second, I was refactoring this and forgot to test it. Here’s the actual code:
A map presumably requires an input
DispatchQueue.concurrentMap maps a Range<Int> -> T, but since the range is always 0..<n, we only ask for the value of n. It could also be written quite naturally as an extension on Range and build everything on top of it.
extension DispatchQueue {
static func concurrentMap<T>(iterations: Int, execute block: (Int) -> T) -> [T] {
let __result = UnsafeMutableRawBufferPointer.allocate(count: iterations * MemoryLayout<T>.stride)
defer { __result.deallocate() }
let _result = __result.baseAddress?.assumingMemoryBound(to: T.self)
You never bound the memory to T, so this will be undefined behavior.
let result = UnsafeMutableBufferPointer<T>(start: _result, count: iterations)
concurrentPerform(iterations: iterations) { idx in
result[idx] = block(idx)
You also never initialized the Ts in that memory region, so assigning
into them will also be undefined behavior.
}
return Array(result)
}
}
extension Array {
func concurrentMap<T>(execute block: (Element)->T) -> [T] {
return DispatchQueue.concurrentMap(iterations: count) { block(self[$0]) }
}
}
Unfortunately I don’t think there’s a way to get an array to take over a +1
UnsafeMutableBufferPointer without copying.
The only correct way to do this without creating intermediate storage is
to have a way to initialize your result elements, e.g.:
import Dispatch
protocol DefaultInitializable {
init()
}
extension RandomAccessCollection {
func concurrentMap<T>(_ transform: (Iterator.Element)->T) -> [T]
where T : DefaultInitializable {
var result = Array(
repeating: T(), count: numericCast(self.count))
DispatchQueue.concurrentPerform(iterations: result.count) {
offset in
result[offset] = transform(
self[index(startIndex, offsetBy: numericCast(offset))])
}
return result
}
}
extension Int : DefaultInitializable { }
print((3..<20).concurrentMap { $0 * 2 })
I had a go at doing that before, using Optional<T> and unwrapping at the end — but it occurred to me that it would be very inefficient for things like Optional<Int>, and introduces more allocations.
If you don't want the DefaultInitializable requirement (or some other
way to prepare initialized elements), you'll need to manage memory
yourself:
extension RandomAccessCollection {
func concurrentMap<T>(_ transform: (Iterator.Element)->T) -> [T] {
let n = numericCast(self.count) as Int
let p = UnsafeMutablePointer<T>.allocate(capacity: n)
defer { p.deallocate(capacity: n) }
DispatchQueue.concurrentPerform(iterations: n) {
offset in
(p + offset).initialize(
to: transform(
self[index(startIndex, offsetBy: numericCast(offset))]))
}
return Array(UnsafeMutableBufferPointer(start: p, count: n))
}
}
This posting highlights a couple of weaknesses in the standard library
for which I'd appreciate bug reports:
1. No way to arbitrarily initialize an Array's storage.
2. UnsafeMutableBufferPointer doesn't have an allocating init
Thanks!
--
-Dave
_______________________________________________
swift-users mailing list
swift-users@swift.org
https://lists.swift.org/mailman/listinfo/swift-users
Filed:
1. [SR-3087] No way to arbitrarily initialise an Array's storage · Issue #45677 · apple/swift · GitHub
2. [SR-3088] UnsafeMutableBufferPointer doesn't have an allocating init · Issue #45678 · apple/swift · GitHub
What is your opinion on the corelibs extending the standard library types? Foundation does it to provide APIs from NSString, but it’s kind of a special case. Would it be reasonable for Dispatch (which is not _such_ a special case) to also extend types like Range and Collection?
I quite like the API as an extension on Range. I think it would be a nice addition to Dispatch (once we start allowing additive proposals):
extension Range where Bound : Strideable, Bound.Stride : SignedInteger {
func concurrentMap<T>(_ transform: (Bound) -> T) -> [T] {
let n = numericCast(count) as Int
let buffer = UnsafeMutablePointer<T>.allocate(capacity: n)
DispatchQueue.concurrentPerform(iterations: n) {
(buffer + $0).initialize(to: transform(lowerBound + numericCast($0)))
}
// Unfortunately, the buffer is copied when making it an Array<T>.
defer { buffer.deallocate(capacity: n) }
return Array(UnsafeMutableBufferPointer<T>(start: buffer, count: n))
}
}
extension Collection {
func concurrentMap<T>(_ transform: (Iterator.Element)->T) -> [T] {
// ‘as Range’ because CountableRange is a collection, causing the function to be recursive.
return ((0..<numericCast(count)) as Range).concurrentMap {
transform(self[index(startIndex, offsetBy: numericCast($0))])
}
}
}
Thanks
- Karl
···
On 30 Oct 2016, at 19:23, Dave Abrahams via swift-users <swift-users@swift.org> wrote:
on Sun Oct 30 2016, Karl <swift-users-AT-swift.org> wrote:
On 30 Oct 2016, at 09:15, Karl <raziel.im+swift-users@gmail.com> wrote: