Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Concurrent Map Implementations, Benchmarked
// See commentary below this gist.
import Foundation
import QuartzCore
// Implementation from https://talk.objc.io/episodes/S01E90-concurrent-map
public final class ThreadSafe<A> {
var _value: A
let queue = DispatchQueue(label: "ThreadSafe")
init(_ value: A) { self._value = value }
var value: A {
return queue.sync { _value }
}
func atomically(_ transform: (inout A) -> ()) {
queue.sync { transform(&self._value) }
}
}
extension Array {
func concurrentMap1<B>(nthreads:Int?=nil, _ transform: (Element) -> B) -> [B] {
let result = ThreadSafe(Array<B?>(repeating: nil, count: count))
let nt = nthreads ?? count
let cs = (count-1)/nt+1
DispatchQueue.concurrentPerform(iterations: nt) { i in
let min = i*cs
let max = min+cs>count ? count : min+cs
for idx in (min..<max) {
let element = self[idx]
let transformed = transform(element)
result.atomically { $0[idx] = transformed }
}
}
return result.value.map { $0! }
}
}
// My generic implementation
extension RandomAccessCollection {
/// Returns `self.map(transform)`, computed in parallel.
///
/// - Requires: `transform` is safe to call from multiple threads.
func concurrentMap2<B>(minBatchSize: Int = 4096, _ transform: (Element) -> B) -> [B] {
precondition(minBatchSize >= 1)
let n = self.count
let batchCount = (n + minBatchSize - 1) / minBatchSize
if batchCount < 2 { return self.map(transform) }
return Array(unsafeUninitializedCapacity: n) {
uninitializedMemory, resultCount in
resultCount = n
let baseAddress = uninitializedMemory.baseAddress!
DispatchQueue.concurrentPerform(iterations: batchCount) { b in
let startOffset = b * n / batchCount
let endOffset = (b + 1) * n / batchCount
var sourceIndex = index(self.startIndex, offsetBy: startOffset)
for p in baseAddress+startOffset..<baseAddress+endOffset {
p.initialize(to: transform(self[sourceIndex]))
formIndex(after: &sourceIndex)
}
}
}
}
}
// This oughta be an optimization, but doesn't seem to be!
extension Array {
/// Returns `self.map(transform)`, computed in parallel.
///
/// - Requires: `transform` is safe to call from multiple threads.
func concurrentMap3<B>(_ transform: (Element) -> B) -> [B] {
withUnsafeBufferPointer { $0.concurrentMap2(transform) }
}
}
// Implementation with no unsafe constructs.
extension RandomAccessCollection {
/// Returns `self.map(transform)`, computed in parallel.
///
/// - Requires: `transform` is safe to call from multiple threads.
func concurrentMap4<B>(_ transform: (Element) -> B) -> [B] {
let batchSize = 4096 // Tune this
let n = self.count
let batchCount = (n + batchSize - 1) / batchSize
if batchCount < 2 { return self.map(transform) }
var batches = ThreadSafe(
ContiguousArray<[B]?>(repeating: nil, count: batchCount))
func batchStart(_ b: Int) -> Index {
index(startIndex, offsetBy: b * n / batchCount)
}
DispatchQueue.concurrentPerform(iterations: batchCount) { b in
let batch = self[batchStart(b)..<batchStart(b + 1)].map(transform)
batches.atomically { $0[b] = batch }
}
return batches.value.flatMap { $0! }
}
}
func test(count: Int, _ transform: (Int)->Int) {
let hugeCollection = 0...655360
let hugeArray = Array(hugeCollection)
func time<R>(_ f: ()->R) -> (time: Double, result: R) {
let startTime = CACurrentMediaTime()
let r = f()
let t = CACurrentMediaTime() - startTime
return (t, r)
}
let (t0, r0) = time { hugeArray.map(transform) }
print("sequential map time:", t0, "(the one to beat)")
let (t1, r1) = time { hugeArray.concurrentMap1(transform) }
print("concurrentMap1 time:", t1)
let (t2, r2) = time { hugeArray.concurrentMap2(transform) }
print("concurrentMap2 time:", t2)
let (t3, r3) = time { hugeArray.concurrentMap3(transform) }
print("concurrentMap3 time:", t3)
let (t4, r4) = time { hugeArray.concurrentMap4(transform) }
print("concurrentMap4 time:", t4)
if r1 != r0 { fatalError("bad implementation 1") }
if r2 != r0 { fatalError("bad implementation 2") }
if r3 != r0 { fatalError("bad implementation 3") }
if r4 != r0 { fatalError("bad implementation 4") }
}
let N = 65536
print("* Testing a fast operation, to show overhead")
test(count: N * 10) { $0 &+ 1 }
print()
print("* Testing slow operations")
let bigArray = Array(0...N)
for shift in 0..<5 {
let M = (N >> 4) << shift
let workload = bigArray.prefix(M)
print("- worklaod size: ", workload.count)
test(count: N) { workload.reduce($0, &+) }
}
@dabrahams

This comment has been minimized.

Copy link
Owner Author

@dabrahams dabrahams commented Mar 13, 2020

These results show that:

% swiftc -O concmap.swift -o /tmp/concmap && /tmp/concmap
* Testing a fast operation, to show overhead
sequential map time: 0.0047810429823584855 (the one to beat)
concurrentMap1 time: 2.6848530589486472
concurrentMap2 time: 0.0006109760142862797
concurrentMap3 time: 0.0018018749542534351
concurrentMap4 time: 0.006668329006060958

* Testing slow operations
- worklaod size:  4096
sequential map time: 0.2821901539573446 (the one to beat)
concurrentMap1 time: 2.7502949939807877
concurrentMap2 time: 0.03729481704067439
concurrentMap3 time: 0.03941335098352283
concurrentMap4 time: 0.03851412795484066
- worklaod size:  8192
sequential map time: 0.8860503709875047 (the one to beat)
concurrentMap1 time: 2.744172142003663
concurrentMap2 time: 0.10571377602173015
concurrentMap3 time: 0.09992507298011333
concurrentMap4 time: 0.10328929399838671
- worklaod size:  16384
sequential map time: 1.588162227999419 (the one to beat)
concurrentMap1 time: 2.6967784829903394
concurrentMap2 time: 0.1963050719932653
concurrentMap3 time: 0.1879734710091725
concurrentMap4 time: 0.20308988098986447
- worklaod size:  32768
sequential map time: 3.226483370002825 (the one to beat)
concurrentMap1 time: 2.6823930320097134
concurrentMap2 time: 0.40369256696430966
concurrentMap3 time: 0.40157766197808087
concurrentMap4 time: 0.4020686270087026
- worklaod size:  65536
sequential map time: 7.184593409008812 (the one to beat)
concurrentMap1 time: 2.820290855015628
concurrentMap2 time: 0.946223387029022
concurrentMap3 time: 0.945886077999603
concurrentMap4 time: 1.0435081740142778
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.