Skip to content

Instantly share code, notes, and snippets.

@JadenGeller
Created January 21, 2024 05:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save JadenGeller/9f20b280ae1717f2e4443b3be457d680 to your computer and use it in GitHub Desktop.
Save JadenGeller/9f20b280ae1717f2e4443b3be457d680 to your computer and use it in GitHub Desktop.
Parallelize sequential steps in a pipeline with streaming inputs
protocol PipelineStep {
associatedtype Context
associatedtype Downstream: PipelineStep
func step(with context: Context) async throws -> Downstream
}
protocol IdleCheckablePipelinewStep: PipelineStep {
var isIdle: Bool { get }
}
protocol BatchablePipelineStep: PipelineStep {
associatedtype Batched: PipelineStep
static func batching(_ work: [Self]) -> Batched
}
protocol RunnablePipeline: IdleCheckablePipelinewStep where Downstream == Self {
associatedtype Work
mutating func insert(_ work: Work)
}
extension RunnablePipeline {
mutating func run(processing sequence: some Sequence<Work>, with context: Context) async throws {
precondition(isIdle)
for work in sequence {
insert(work)
self = try await step(with: context)
}
while !isIdle {
self = try await step(with: context)
}
}
}
extension Optional: PipelineStep where Wrapped: PipelineStep {
func step(with context: Wrapped.Context) async throws -> Wrapped.Downstream? {
guard let self else { return nil }
return try await self.step(with: context)
}
}
extension Optional: IdleCheckablePipelinewStep where Wrapped: IdleCheckablePipelinewStep {
var isIdle: Bool {
switch self {
case .none: true
case .some(let base): base.isIdle
}
}
}
extension Optional: BatchablePipelineStep where Wrapped: BatchablePipelineStep {
static func batching(_ work: [Self]) -> Wrapped.Batched {
Wrapped.batching(Array(work.compacted()))
}
}
struct Terminal: IdleCheckablePipelinewStep {
func step(with context: Void) async throws -> Self {
self
}
var isIdle: Bool { true }
}
struct ParallelBatch<Base: PipelineStep, Downstream: PipelineStep>: IdleCheckablePipelinewStep {
var batch: [Base]
var collect: ([Base.Downstream]) -> Downstream
init(_ batch: [Base], collect: @escaping ([Base.Downstream]) -> Downstream) {
self.batch = batch
self.collect = collect
}
init(_ batch: [Base]) where Base.Downstream: BatchablePipelineStep, Base.Downstream.Batched == Downstream {
self.batch = batch
self.collect = { Base.Downstream.batching($0) }
}
func step(with context: Base.Context) async throws -> Downstream {
collect(try await Array(batch.async.map { try await $0.step(with: context) }))
}
var isIdle: Bool {
batch.isEmpty
}
}
@JadenGeller
Copy link
Author

Example usage:

private struct DateIndexingPipeline: RunnablePipeline {
    var calculateDate: CalculateDate.Batched
    var updateDateIndex: UpdateDateIndex.Batched
    var reportProgress: ReportProgress?
    
    struct Context {
        var calculateDate: CalculateDate.Batched.Context
        var updateStorage: UpdateDateIndex.Batched.Context
        var reportProgress: ReportProgress.Context
    }
    
    func step(with context: Context) async throws -> Self {
        async let updateDateIndex = self.calculateDate.step(with: context.calculateDate)
        async let reportProgress = self.updateDateIndex.step(with: context.updateStorage)
        async let _ = self.reportProgress?.step(with: context.reportProgress)
        return try await .init(calculateDate: .init([]), updateDateIndex: updateDateIndex, reportProgress: reportProgress)
    }
    
    var isIdle: Bool {
        calculateDate.isIdle && updateDateIndex.isIdle && reportProgress.isIdle
    }
    
    mutating func insert(_ work: [(assetID: AssetID, asset: PHAsset)]) {
        calculateDate.batch.append(contentsOf: work.map(CalculateDate.init))
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment