Created
January 21, 2024 05:37
-
-
Save JadenGeller/9f20b280ae1717f2e4443b3be457d680 to your computer and use it in GitHub Desktop.
Parallelize sequential steps in a pipeline with streaming inputs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
protocol PipelineStep { | |
associatedtype Context | |
associatedtype Downstream: PipelineStep | |
func step(with context: Context) async throws -> Downstream | |
} | |
protocol IdleCheckablePipelinewStep: PipelineStep { | |
var isIdle: Bool { get } | |
} | |
protocol BatchablePipelineStep: PipelineStep { | |
associatedtype Batched: PipelineStep | |
static func batching(_ work: [Self]) -> Batched | |
} | |
protocol RunnablePipeline: IdleCheckablePipelinewStep where Downstream == Self { | |
associatedtype Work | |
mutating func insert(_ work: Work) | |
} | |
extension RunnablePipeline { | |
mutating func run(processing sequence: some Sequence<Work>, with context: Context) async throws { | |
precondition(isIdle) | |
for work in sequence { | |
insert(work) | |
self = try await step(with: context) | |
} | |
while !isIdle { | |
self = try await step(with: context) | |
} | |
} | |
} | |
extension Optional: PipelineStep where Wrapped: PipelineStep { | |
func step(with context: Wrapped.Context) async throws -> Wrapped.Downstream? { | |
guard let self else { return nil } | |
return try await self.step(with: context) | |
} | |
} | |
extension Optional: IdleCheckablePipelinewStep where Wrapped: IdleCheckablePipelinewStep { | |
var isIdle: Bool { | |
switch self { | |
case .none: true | |
case .some(let base): base.isIdle | |
} | |
} | |
} | |
extension Optional: BatchablePipelineStep where Wrapped: BatchablePipelineStep { | |
static func batching(_ work: [Self]) -> Wrapped.Batched { | |
Wrapped.batching(Array(work.compacted())) | |
} | |
} | |
struct Terminal: IdleCheckablePipelinewStep { | |
func step(with context: Void) async throws -> Self { | |
self | |
} | |
var isIdle: Bool { true } | |
} | |
struct ParallelBatch<Base: PipelineStep, Downstream: PipelineStep>: IdleCheckablePipelinewStep { | |
var batch: [Base] | |
var collect: ([Base.Downstream]) -> Downstream | |
init(_ batch: [Base], collect: @escaping ([Base.Downstream]) -> Downstream) { | |
self.batch = batch | |
self.collect = collect | |
} | |
init(_ batch: [Base]) where Base.Downstream: BatchablePipelineStep, Base.Downstream.Batched == Downstream { | |
self.batch = batch | |
self.collect = { Base.Downstream.batching($0) } | |
} | |
func step(with context: Base.Context) async throws -> Downstream { | |
collect(try await Array(batch.async.map { try await $0.step(with: context) })) | |
} | |
var isIdle: Bool { | |
batch.isEmpty | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Example usage: