Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Swift async/await implementation of a parallel map
extension Collection {
func parallelMap<T>(
parallelism requestedParallelism: Int? = nil,
_ transform: @escaping (Element) async throws -> T
) async throws -> [T] {
let defaultParallelism = 2
let parallelism = requestedParallelism ?? defaultParallelism
let n = self.count
if n == 0 {
return []
}
return await try Task.withGroup(resultType: (Int, T).self) { group in
var result = Array<T?>(repeatElement(nil, count: n))
var i = self.startIndex
var submitted = 0
func submitNext() async throws {
if i == self.endIndex { return }
await group.add { [submitted, i] in
let value = await try transform(self[i])
return (submitted, value)
}
submitted += 1
formIndex(after: &i)
}
// submit first initial tasks
for _ in 0..<parallelism {
await try submitNext()
}
// as each task completes, submit a new task until we run out of work
while let (index, taskResult) = await try! group.next() {
result[index] = taskResult
await try Task.checkCancellation()
await try submitNext()
}
assert(result.count == n)
return Array(result.compactMap { $0 })
}
}
}
func getSimpleArray(n: Int) -> [Int] {
var array = [Int]()
for i in 0..<n {
array.append(i)
}
return array
}
runAsyncAndBlock {
let array = getSimpleArray(n: 100)
let resultArray: [String] = await try! array.parallelMap(parallelism: 4) { element in
print("Transforming \(element)")
return String(element * 10)
}
print(resultArray)
}
@wilg
Copy link

wilg commented Nov 3, 2021

I updated this to Swift 5.5. https://gist.github.com/wilg/47a04c8f5083a6938da6087f77333784

Is this still the best way to do this? Or is there something I'm missing in the standard library. (For parallel map or parallel each)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment