Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Concurrent Map Implementations, Benchmarked
// See commentary below this gist.
import Foundation
import QuartzCore
// Implementation from https://talk.objc.io/episodes/S01E90-concurrent-map
public final class ThreadSafe<A> {
var _value: A
let queue = DispatchQueue(label: "ThreadSafe")
init(_ value: A) { self._value = value }
var value: A {
return queue.sync { _value }
}
func atomically(_ transform: (inout A) -> ()) {
queue.sync { transform(&self._value) }
}
}
extension Array {
func concurrentMap1<B>(nthreads:Int?=nil, _ transform: (Element) -> B) -> [B] {
let result = ThreadSafe(Array<B?>(repeating: nil, count: count))
let nt = nthreads ?? count
let cs = (count-1)/nt+1
DispatchQueue.concurrentPerform(iterations: nt) { i in
let min = i*cs
let max = min+cs>count ? count : min+cs
for idx in (min..<max) {
let element = self[idx]
let transformed = transform(element)
result.atomically { $0[idx] = transformed }
}
}
return result.value.map { $0! }
}
}
// My generic implementation
extension RandomAccessCollection {
/// Returns `self.map(transform)`, computed in parallel.
///
/// - Requires: `transform` is safe to call from multiple threads.
func concurrentMap2<B>(minBatchSize: Int = 4096, _ transform: (Element) -> B) -> [B] {
precondition(minBatchSize >= 1)
let n = self.count
let batchCount = (n + minBatchSize - 1) / minBatchSize
if batchCount < 2 { return self.map(transform) }
return Array(unsafeUninitializedCapacity: n) {
uninitializedMemory, resultCount in
resultCount = n
let baseAddress = uninitializedMemory.baseAddress!
DispatchQueue.concurrentPerform(iterations: batchCount) { b in
let startOffset = b * n / batchCount
let endOffset = (b + 1) * n / batchCount
var sourceIndex = index(self.startIndex, offsetBy: startOffset)
for p in baseAddress+startOffset..<baseAddress+endOffset {
p.initialize(to: transform(self[sourceIndex]))
formIndex(after: &sourceIndex)
}
}
}
}
}
// This oughta be an optimization, but doesn't seem to be!
extension Array {
/// Returns `self.map(transform)`, computed in parallel.
///
/// - Requires: `transform` is safe to call from multiple threads.
func concurrentMap3<B>(_ transform: (Element) -> B) -> [B] {
withUnsafeBufferPointer { $0.concurrentMap2(transform) }
}
}
// Implementation with no unsafe constructs.
extension RandomAccessCollection {
/// Returns `self.map(transform)`, computed in parallel.
///
/// - Requires: `transform` is safe to call from multiple threads.
func concurrentMap4<B>(_ transform: (Element) -> B) -> [B] {
let batchSize = 4096 // Tune this
let n = self.count
let batchCount = (n + batchSize - 1) / batchSize
if batchCount < 2 { return self.map(transform) }
var batches = ThreadSafe(
ContiguousArray<[B]?>(repeating: nil, count: batchCount))
func batchStart(_ b: Int) -> Index {
index(startIndex, offsetBy: b * n / batchCount)
}
DispatchQueue.concurrentPerform(iterations: batchCount) { b in
let batch = self[batchStart(b)..<batchStart(b + 1)].map(transform)
batches.atomically { $0[b] = batch }
}
return batches.value.flatMap { $0! }
}
}
func test(count: Int, _ transform: (Int)->Int) {
let hugeCollection = 0...655360
let hugeArray = Array(hugeCollection)
func time<R>(_ f: ()->R) -> (time: Double, result: R) {
let startTime = CACurrentMediaTime()
let r = f()
let t = CACurrentMediaTime() - startTime
return (t, r)
}
let (t0, r0) = time { hugeArray.map(transform) }
print("sequential map time:", t0, "(the one to beat)")
let (t1, r1) = time { hugeArray.concurrentMap1(transform) }
print("concurrentMap1 time:", t1)
let (t2, r2) = time { hugeArray.concurrentMap2(transform) }
print("concurrentMap2 time:", t2)
let (t3, r3) = time { hugeArray.concurrentMap3(transform) }
print("concurrentMap3 time:", t3)
let (t4, r4) = time { hugeArray.concurrentMap4(transform) }
print("concurrentMap4 time:", t4)
if r1 != r0 { fatalError("bad implementation 1") }
if r2 != r0 { fatalError("bad implementation 2") }
if r3 != r0 { fatalError("bad implementation 3") }
if r4 != r0 { fatalError("bad implementation 4") }
}
let N = 65536
print("* Testing a fast operation, to show overhead")
test(count: N * 10) { $0 &+ 1 }
print()
print("* Testing slow operations")
let bigArray = Array(0...N)
for shift in 0..<5 {
let M = (N >> 4) << shift
let workload = bigArray.prefix(M)
print("- worklaod size: ", workload.count)
test(count: N) { workload.reduce($0, &+) }
}
@dabrahams

This comment has been minimized.

Copy link
Owner Author

@dabrahams dabrahams commented Mar 13, 2020

These results show that:

% swiftc -O concmap.swift -o /tmp/concmap && /tmp/concmap
* Testing a fast operation, to show overhead
sequential map time: 0.0047810429823584855 (the one to beat)
concurrentMap1 time: 2.6848530589486472
concurrentMap2 time: 0.0006109760142862797
concurrentMap3 time: 0.0018018749542534351
concurrentMap4 time: 0.006668329006060958

* Testing slow operations
- worklaod size:  4096
sequential map time: 0.2821901539573446 (the one to beat)
concurrentMap1 time: 2.7502949939807877
concurrentMap2 time: 0.03729481704067439
concurrentMap3 time: 0.03941335098352283
concurrentMap4 time: 0.03851412795484066
- worklaod size:  8192
sequential map time: 0.8860503709875047 (the one to beat)
concurrentMap1 time: 2.744172142003663
concurrentMap2 time: 0.10571377602173015
concurrentMap3 time: 0.09992507298011333
concurrentMap4 time: 0.10328929399838671
- worklaod size:  16384
sequential map time: 1.588162227999419 (the one to beat)
concurrentMap1 time: 2.6967784829903394
concurrentMap2 time: 0.1963050719932653
concurrentMap3 time: 0.1879734710091725
concurrentMap4 time: 0.20308988098986447
- worklaod size:  32768
sequential map time: 3.226483370002825 (the one to beat)
concurrentMap1 time: 2.6823930320097134
concurrentMap2 time: 0.40369256696430966
concurrentMap3 time: 0.40157766197808087
concurrentMap4 time: 0.4020686270087026
- worklaod size:  65536
sequential map time: 7.184593409008812 (the one to beat)
concurrentMap1 time: 2.820290855015628
concurrentMap2 time: 0.946223387029022
concurrentMap3 time: 0.945886077999603
concurrentMap4 time: 1.0435081740142778
@DenTelezhkin

This comment has been minimized.

Copy link

@DenTelezhkin DenTelezhkin commented Oct 14, 2020

Hi @dabrahams!

Thanks for great implementations and benchmarking!

I wonder, if you happen to have an advice for using those asynchronously. In my case, I need concurrent processing for large sets of data, but I also want interface to stay responsive. My current(very rough) implementation uses concurrentMap2 in the following way:

final class CancellationToken {
    var cancelled: Bool = false
    
    func cancel() {
        cancelled = true
    }
}

extension RandomAccessCollection {
    /// Returns `self.map(transform)`, computed in parallel.
    ///
    /// - Requires: `transform` is safe to call from multiple threads.
    func concurrentMap<B>(minBatchSize: Int = 4096, token: CancellationToken = .init(), _ transform: (Element) -> B) -> [B] {
        precondition(minBatchSize >= 1)
        let n = self.count
        let batchCount = (n + minBatchSize - 1) / minBatchSize
        if batchCount < 2 { return self.map(transform) }
        
        return Array(unsafeUninitializedCapacity: n) {
            uninitializedMemory, resultCount in
            resultCount = n
            let baseAddress = uninitializedMemory.baseAddress!
            
            DispatchQueue.concurrentPerform(iterations: batchCount) { b in
                guard !token.cancelled else {
                    return
                }
                let startOffset = b * n / batchCount
                let endOffset = (b + 1) * n / batchCount
                var sourceIndex = index(self.startIndex, offsetBy: startOffset)
                for p in baseAddress+startOffset..<baseAddress+endOffset {
                    p.initialize(to: transform(self[sourceIndex]))
                    formIndex(after: &sourceIndex)
                }
            }
        }
    }

    func asyncConcurrentMap<T>(qos: DispatchQoS.QoSClass = .userInitiated, token: CancellationToken, transform: @escaping (Element) -> T, completion: @escaping ([T], CancellationToken) -> ()) {
        DispatchQueue.global(qos: qos).async {
            let result = self.concurrentMap(token: token, transform)
            
            DispatchQueue.main.async {
                completion(result, token)
            }
        }
    }
}

It works fine in my case, however I'm pretty sure I'm walking in very dangerous territory, since Array memory is not fully initialized, because of cancellation. Documentation clearly states that The memory in the range buffer[0..<initializedCount] must be initialized at the end of the closure's execution, but I'm not sure how to cancel this operation differently.

Would love to hear any of your thoughts on this!

@DenTelezhkin

This comment has been minimized.

Copy link

@DenTelezhkin DenTelezhkin commented Oct 15, 2020

Update: it turns out, my implementation was not actually cancelling anything, and when I did, everything crashes, as expected ) So far, the only reasonable thing I came up with, is initializing with bogus values instead of running expensive transform operation:

extension RandomAccessCollection {
    /// Returns `self.map(transform)`, computed in parallel.
    ///
    /// - Requires: `transform` is safe to call from multiple threads.
    func concurrentMap<B>(minBatchSize: Int = 4096, token: CancellationToken = .init(), bogusValue: B, _ transform: (Element) -> B) -> [B] {
        precondition(minBatchSize >= 1)
        let n = self.count
        let batchCount = (n + minBatchSize - 1) / minBatchSize
        if batchCount < 2 { return self.map(transform) }
        
        return Array(unsafeUninitializedCapacity: n) {
            uninitializedMemory, resultCount in
            resultCount = n
            let baseAddress = uninitializedMemory.baseAddress!
            
            DispatchQueue.concurrentPerform(iterations: batchCount) { b in
                let startOffset = b * n / batchCount
                let endOffset = (b + 1) * n / batchCount
                var sourceIndex = index(self.startIndex, offsetBy: startOffset)
                for p in baseAddress+startOffset..<baseAddress+endOffset {
                    if token.cancelled {
                        p.initialize(to: bogusValue)
                    } else {
                        p.initialize(to: transform(self[sourceIndex]))
                    }
                    formIndex(after: &sourceIndex)
                }
            }
        }
    }

    func asyncConcurrentMap<T>(qos: DispatchQoS.QoSClass = .userInitiated, token: CancellationToken, bogusValue: T, transform: @escaping (Element) -> T, completion: @escaping ([T], CancellationToken) -> ()) {
        DispatchQueue.global(qos: qos).async {
            let result = self.concurrentMap(token: token, bogusValue: bogusValue, transform)
            
            DispatchQueue.main.async {
                completion(result, token)
            }
        }
    }
}

Not a fan of how this API reads, but at least it's safe, doesn't crash and actually improves performance by not running transform operation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.