Skip to content

Instantly share code, notes, and snippets.

@dabrahams
Last active April 3, 2024 04:59
Show Gist options
  • Star 11 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save dabrahams/ea5495b4cccc2970cd56e8cfc72ca761 to your computer and use it in GitHub Desktop.
Save dabrahams/ea5495b4cccc2970cd56e8cfc72ca761 to your computer and use it in GitHub Desktop.
Concurrent Map Implementations, Benchmarked
// See commentary below this gist.
import Foundation
import QuartzCore
// Implementation from https://talk.objc.io/episodes/S01E90-concurrent-map
public final class ThreadSafe<A> {
var _value: A
let queue = DispatchQueue(label: "ThreadSafe")
init(_ value: A) { self._value = value }
var value: A {
return queue.sync { _value }
}
func atomically(_ transform: (inout A) -> ()) {
queue.sync { transform(&self._value) }
}
}
extension Array {
func concurrentMap1<B>(nthreads:Int?=nil, _ transform: (Element) -> B) -> [B] {
let result = ThreadSafe(Array<B?>(repeating: nil, count: count))
let nt = nthreads ?? count
let cs = (count-1)/nt+1
DispatchQueue.concurrentPerform(iterations: nt) { i in
let min = i*cs
let max = min+cs>count ? count : min+cs
for idx in (min..<max) {
let element = self[idx]
let transformed = transform(element)
result.atomically { $0[idx] = transformed }
}
}
return result.value.map { $0! }
}
}
// My generic implementation
extension RandomAccessCollection {
/// Returns `self.map(transform)`, computed in parallel.
///
/// - Requires: `transform` is safe to call from multiple threads.
func concurrentMap2<B>(minBatchSize: Int = 4096, _ transform: (Element) -> B) -> [B] {
precondition(minBatchSize >= 1)
let n = self.count
let batchCount = (n + minBatchSize - 1) / minBatchSize
if batchCount < 2 { return self.map(transform) }
return Array(unsafeUninitializedCapacity: n) {
uninitializedMemory, resultCount in
resultCount = n
let baseAddress = uninitializedMemory.baseAddress!
DispatchQueue.concurrentPerform(iterations: batchCount) { b in
let startOffset = b * n / batchCount
let endOffset = (b + 1) * n / batchCount
var sourceIndex = index(self.startIndex, offsetBy: startOffset)
for p in baseAddress+startOffset..<baseAddress+endOffset {
p.initialize(to: transform(self[sourceIndex]))
formIndex(after: &sourceIndex)
}
}
}
}
}
// This oughta be an optimization, but doesn't seem to be!
extension Array {
/// Returns `self.map(transform)`, computed in parallel.
///
/// - Requires: `transform` is safe to call from multiple threads.
func concurrentMap3<B>(_ transform: (Element) -> B) -> [B] {
withUnsafeBufferPointer { $0.concurrentMap2(transform) }
}
}
// Implementation with no unsafe constructs.
extension RandomAccessCollection {
/// Returns `self.map(transform)`, computed in parallel.
///
/// - Requires: `transform` is safe to call from multiple threads.
func concurrentMap4<B>(_ transform: (Element) -> B) -> [B] {
let batchSize = 4096 // Tune this
let n = self.count
let batchCount = (n + batchSize - 1) / batchSize
if batchCount < 2 { return self.map(transform) }
var batches = ThreadSafe(
ContiguousArray<[B]?>(repeating: nil, count: batchCount))
func batchStart(_ b: Int) -> Index {
index(startIndex, offsetBy: b * n / batchCount)
}
DispatchQueue.concurrentPerform(iterations: batchCount) { b in
let batch = self[batchStart(b)..<batchStart(b + 1)].map(transform)
batches.atomically { $0[b] = batch }
}
return batches.value.flatMap { $0! }
}
}
func test(count: Int, _ transform: (Int)->Int) {
let hugeCollection = 0...655360
let hugeArray = Array(hugeCollection)
func time<R>(_ f: ()->R) -> (time: Double, result: R) {
let startTime = CACurrentMediaTime()
let r = f()
let t = CACurrentMediaTime() - startTime
return (t, r)
}
let (t0, r0) = time { hugeArray.map(transform) }
print("sequential map time:", t0, "(the one to beat)")
let (t1, r1) = time { hugeArray.concurrentMap1(transform) }
print("concurrentMap1 time:", t1)
let (t2, r2) = time { hugeArray.concurrentMap2(transform) }
print("concurrentMap2 time:", t2)
let (t3, r3) = time { hugeArray.concurrentMap3(transform) }
print("concurrentMap3 time:", t3)
let (t4, r4) = time { hugeArray.concurrentMap4(transform) }
print("concurrentMap4 time:", t4)
if r1 != r0 { fatalError("bad implementation 1") }
if r2 != r0 { fatalError("bad implementation 2") }
if r3 != r0 { fatalError("bad implementation 3") }
if r4 != r0 { fatalError("bad implementation 4") }
}
let N = 65536
print("* Testing a fast operation, to show overhead")
test(count: N * 10) { $0 &+ 1 }
print()
print("* Testing slow operations")
let bigArray = Array(0...N)
for shift in 0..<5 {
let M = (N >> 4) << shift
let workload = bigArray.prefix(M)
print("- worklaod size: ", workload.count)
test(count: N) { workload.reduce($0, &+) }
}
@DenTelezhkin
Copy link

Update: it turns out, my implementation was not actually cancelling anything, and when I did, everything crashes, as expected ) So far, the only reasonable thing I came up with, is initializing with bogus values instead of running expensive transform operation:

extension RandomAccessCollection {
    /// Returns `self.map(transform)`, computed in parallel.
    ///
    /// - Requires: `transform` is safe to call from multiple threads.
    func concurrentMap<B>(minBatchSize: Int = 4096, token: CancellationToken = .init(), bogusValue: B, _ transform: (Element) -> B) -> [B] {
        precondition(minBatchSize >= 1)
        let n = self.count
        let batchCount = (n + minBatchSize - 1) / minBatchSize
        if batchCount < 2 { return self.map(transform) }
        
        return Array(unsafeUninitializedCapacity: n) {
            uninitializedMemory, resultCount in
            resultCount = n
            let baseAddress = uninitializedMemory.baseAddress!
            
            DispatchQueue.concurrentPerform(iterations: batchCount) { b in
                let startOffset = b * n / batchCount
                let endOffset = (b + 1) * n / batchCount
                var sourceIndex = index(self.startIndex, offsetBy: startOffset)
                for p in baseAddress+startOffset..<baseAddress+endOffset {
                    if token.cancelled {
                        p.initialize(to: bogusValue)
                    } else {
                        p.initialize(to: transform(self[sourceIndex]))
                    }
                    formIndex(after: &sourceIndex)
                }
            }
        }
    }

    func asyncConcurrentMap<T>(qos: DispatchQoS.QoSClass = .userInitiated, token: CancellationToken, bogusValue: T, transform: @escaping (Element) -> T, completion: @escaping ([T], CancellationToken) -> ()) {
        DispatchQueue.global(qos: qos).async {
            let result = self.concurrentMap(token: token, bogusValue: bogusValue, transform)
            
            DispatchQueue.main.async {
                completion(result, token)
            }
        }
    }
}

Not a fan of how this API reads, but at least it's safe, doesn't crash and actually improves performance by not running transform operation.

@dabrahams
Copy link
Author

Sorry @DenTelezhkin ; I missed your remarks when you posted them.

@DenTelezhkin
Copy link

That's fine =) Thanks for provided solutions anyway. The project this was needed in was finished long time ago, and my last implementation seemed like it was working pretty well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment