Skip to content

Instantly share code, notes, and snippets.

@fabianfett
Last active March 30, 2020 17:08
Show Gist options
  • Save fabianfett/19b2398254df7c912bec871a6837dce2 to your computer and use it in GitHub Desktop.
Save fabianfett/19b2398254df7c912bec871a6837dce2 to your computer and use it in GitHub Desktop.
Run a number of tasks in a SwiftNIO EventLoop
import NIO
extension EventLoop {
public func process<Input, Output>(
input: [Input],
parallel: Int,
process: @escaping (Input, EventLoop) -> EventLoopFuture<Output>)
-> EventLoopFuture<[Result<Output, Error>]>
{
guard self.inEventLoop else {
return self.flatSubmit { () in
self.process(input: input, parallel: parallel, process: process)
}
}
var slice = input[...]
var result = [Result<Output, Error>?](repeating: nil, count: input.count)
var running = 0
var promise = self.makePromise(of: [Result<Output, Error>].self)
func scheduleNextExecution() {
guard running < parallel else {
// no more space to schedule tasks
return
}
let index = slice.startIndex
guard let element = slice.popFirst() else {
guard running == 0 else {
// there are still tasks running
return
}
return promise.succeed(result.map { $0! })
}
running += 1
_ = process(element, self)
.hop(to: self)
.always { (res: Result<Output, Error>) in
running -= 1
result[index] = res
scheduleNextExecution()
}
// try to schedule something new immediately
scheduleNextExecution()
}
scheduleNextExecution()
return promise.futureResult
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment