Skip to content

Instantly share code, notes, and snippets.

@karwa
Last active July 16, 2020 11:00
Show Gist options
  • Save karwa/43ae838809cc68d317003f2885c71572 to your computer and use it in GitHub Desktop.
Save karwa/43ae838809cc68d317003f2885c71572 to your computer and use it in GitHub Desktop.
Concurrent collection wrapper
import Dispatch
// Import C11 for atomic_flag
// FIXME: SWIFT(canImport)
//#if canImport(Glibc)
// import Glibc.stdatomic
//#elseif canImport(Darwin)
import Darwin.C.stdatomic
//#endif
extension RandomAccessCollection {
/// Returns a view of this collection with concurrent operations
/// such as 'forEach', 'map', 'filter', etc.
///
public var concurrent: ConcurrentCollection<Self> {
return ConcurrentCollection(viewing: self)
}
}
public struct ConcurrentCollection<C>: RandomAccessCollection where C: RandomAccessCollection {
@_versioned let wrapped: C
@_transparent
public init(viewing wrapped: C) { self.wrapped = wrapped }
// Indexes
@_transparent
public var startIndex: C.Index { return wrapped.startIndex }
@_transparent
public var endIndex: C.Index { return wrapped.endIndex }
@_transparent
public var count: C.IndexDistance { return wrapped.count }
@_transparent
public func index(before i: C.Index) -> C.Index {
return wrapped.index(before: i)
}
@_transparent
public func index(after i: C.Index) -> C.Index {
return wrapped.index(after: i)
}
@_transparent
public func index(_ i: C.Index, offsetBy n: C.IndexDistance) -> C.Index {
return wrapped.index(i, offsetBy: n)
}
@_transparent
public func index(_ i: C.Index, offsetBy n: C.IndexDistance, limitedBy limit: C.Index) -> C.Index? {
return wrapped.index(i, offsetBy: n, limitedBy: limit)
}
// Iterator
@_transparent
public func makeIterator() -> C.Iterator {
return wrapped.makeIterator()
}
// Subscript
public subscript(index: C.Index) -> C.Iterator.Element {
@_transparent
get { return wrapped[index] }
}
}
// Primitive concurrent operations.
extension ConcurrentCollection {
// This is basically some compiler hackery exploiting a flawed implementation
// of 'rethrows'. But hey, it's what DispatchQueue.sync does internally...
/// Concurrently executes the body with the integers from 0..<count.
///
/// If multiple errors are thrown, the one which gets re-thrown is not
/// necessarily guaranteed to be the 'first' or 'last' one.
///
@inline(__always)
fileprivate func _forEach(_ body: (Int) throws -> Void, rethrowHack: (Error)throws->Never) rethrows {
var result = Error?.none
var hasResult = atomic_flag()
DispatchQueue.concurrentPerform(iterations: numericCast(count)) {
guard result == nil else { return }
do { try body($0) }
catch { if atomic_flag_test_and_set(&hasResult) == false { result = error } }
}
if let error = result { try rethrowHack(error) }
}
/// Concurrently executes the block with the integers from 0..<count.
///
/// If multiple errors are thrown, the one which gets re-thrown is not
/// necessarily guaranteed to be the 'first' or 'last' one.
///
/// Execution happens in batches.
///
@inline(__always)
fileprivate func _batchedForEach<T>(_ body: (Int)throws->T?, rethrowHack: (Error)throws->Never) rethrows -> T? {
let batchSize = 32
var start = 0
var end = Swift.min(start + batchSize, numericCast(count))
// If empty, skip processing.
guard start != end else { return .none }
var result = _FailableResult<T>?.none
var hasResult = atomic_flag()
repeat {
// Concurrently execute over the slice.
DispatchQueue.concurrentPerform(iterations: end - start) {
do {
if let val = try body(start + $0), atomic_flag_test_and_set(&hasResult) == false {
result = .result(val)
}
}
catch {
if atomic_flag_test_and_set(&hasResult) == false {
result = .error(error)
}
}
}
// If we have a result, return/rethrow it.
guard atomic_flag_test_and_set(&hasResult) == false else {
switch result! {
case .result(let r): return r
case .error(let e): try rethrowHack(e)
}
}
// Clear the flag (we just set-it-to-check-it), and advance the slice.
atomic_flag_clear(&hasResult)
start = end
end = Swift.min(start + batchSize, numericCast(count))
} while start != end
// Closure never returned a value, did not throw.
return .none
}
private enum _FailableResult<Result> {
case result(Result)
case error(Error)
}
}
// Concurrent algorithm implementations.
extension ConcurrentCollection {
// Executes the body concurrently for every item.
public func forEach(_ body: (C.Iterator.Element) throws -> Void) rethrows {
try _forEach({
let idx = index(startIndex, offsetBy: numericCast($0))
try body(self[idx])
}, rethrowHack: { throw $0 })
}
// Executes the __non-throwing__ transform concurrently, stores the result in to an Array.
public func map<T>(_ transform: (C.Iterator.Element) -> T) -> [T] {
let n = numericCast(count) as Int
let buffer = UnsafeMutablePointer<T>.allocate(capacity: n)
defer { buffer.deinitialize(count: n); buffer.deallocate(capacity: n) }
_forEach({ let idx = index(startIndex, offsetBy: numericCast($0))
(buffer + $0).initialize(to: transform(self[idx]))
}, rethrowHack: { _ in preconditionFailure() })
// FIXME: SWIFT(unsafe-array-init):
// Unforunately, there is no way to unsafely initialise the contents of an Array<T>,
// so we have to intialise an UnsafeMutablePointer and copy the contents in to an Array.
// https://bugs.swift.org/browse/SR-3087
return Array(UnsafeMutableBufferPointer<T>(start: buffer, count: n))
}
// Executes the __throwing__ transform concurrently, stores the result in to an Array.
public func map<T>(_ transform: (C.Iterator.Element) throws -> T) rethrows -> [T] {
let n = numericCast(count) as Int
// Since the closure may throw and we're not processing linearly,
// _any_ elements may be left uninitialised in the event of an error.
//
// Thus, we allocate a raw-buffer, initialise it to Optional<T>.none
// (which we can still deinit safely if closure throws), and adjust the layout later.
let rawBuffer = UnsafeMutableRawPointer.allocate(bytes: n * MemoryLayout<Optional<T>>.stride,
alignedTo: MemoryLayout<Optional<T>>.alignment)
defer { rawBuffer.deallocate(bytes: n * MemoryLayout<Optional<T>>.stride,
alignedTo: MemoryLayout<Optional<T>>.alignment) }
// Initialise the buffer to nil.
let optBuffer = rawBuffer.bindMemory(to: Optional<T>.self, capacity: n)
optBuffer.initialize(to: .none, count: n)
// Concurrently execute the block.
// If it throws, deinitialise everything and rethrow.
try _forEach({ let idx = index(startIndex, offsetBy: numericCast($0))
(optBuffer + $0).initialize(to: try transform(self[idx]))
}, rethrowHack: { optBuffer.deinitialize(count: n); throw $0 })
// Remove the optional wrapping in-place.
optBuffer.withMemoryRebound(to: T.self, capacity: n) { rebound in
for offset in 0..<n {
rebound.advanced(by: offset).initialize(to: optBuffer.advanced(by: offset).move()!)
}
}
// Re-bind the memory to T, now that it is layout-compatible.
let buffer = rawBuffer.bindMemory(to: T.self, capacity: n)
defer { buffer.deinitialize(count: n) }
// FIXME: SWIFT(unsafe-array-init): copies the buffer.
return Array(UnsafeMutableBufferPointer<T>(start: buffer, count: n))
}
// Executes the transform concurrently, stores non-nil results in to an Array.
public func flatMap<T>(_ transform: (C.Iterator.Element) throws -> T?) rethrows -> [T] {
let n = numericCast(self.count) as Int
// Similarly to throwing-'map', allocate a raw-buffer, initialise with Optional<T>
// and adjust the layout later.
let rawBuffer = UnsafeMutableRawPointer.allocate(bytes: n * MemoryLayout<Optional<T>>.stride,
alignedTo: MemoryLayout<Optional<T>>.alignment)
defer { rawBuffer.deallocate(bytes: n * MemoryLayout<Optional<T>>.stride,
alignedTo: MemoryLayout<Optional<T>>.alignment) }
// Initialise the buffer to nil.
let optBuffer = rawBuffer.bindMemory(to: Optional<T>.self, capacity: n)
optBuffer.initialize(to: .none, count: n)
// Concurrently execute the block. If it throws, deinitialise everything.
try _forEach({
let idx = index(startIndex, offsetBy: numericCast($0))
(optBuffer + $0).initialize(to: try transform(self[idx]))
}, rethrowHack: { optBuffer.deinitialize(count: n); throw $0 })
// Remove the optional wrapping in-place, consuming 'nil' values.
let count = optBuffer.withMemoryRebound(to: T.self, capacity: n) { rebound -> Int in
var ptr = rebound
(0..<n).forEach { offset in
guard let i = optBuffer.advanced(by: offset).move() else { return }
ptr.initialize(to: i)
ptr = ptr.advanced(by: 1)
}
return rebound.distance(to: ptr)
}
// Re-bind the memory to T, now that it is layout-compatible.
let buffer = rawBuffer.bindMemory(to: T.self, capacity: count)
defer { buffer.deinitialize(count: count) }
// FIXME: SWIFT(unsafe-array-init): copies the buffer.
return Array(UnsafeBufferPointer(start: buffer, count: count))
}
// Executes the filter concurrently, stores affirmitive results in to an Array.
public func filter(_ isIncluded: (C.Iterator.Element) throws -> Bool) rethrows -> [C.Iterator.Element] {
return try flatMap { try isIncluded($0) ? $0 : nil }
}
// Executes the predicate concurrently, returns `true` if at least one element returned affirmitively.
public func contains(where predicate: (C.Iterator.Element) throws -> Bool) rethrows -> Bool {
return try nil != _batchedForEach({ off -> Bool? in
let idx = index(startIndex, offsetBy: numericCast(off))
return try predicate(self[idx]) ? true : .none
}, rethrowHack: { throw $0 })
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment