Skip to content

Instantly share code, notes, and snippets.

@1ort
Created January 25, 2023 01:36
Show Gist options
  • Save 1ort/9bcdd1ebc4a803354a0c9e193c794d5d to your computer and use it in GitHub Desktop.
Save 1ort/9bcdd1ebc4a803354a0c9e193c794d5d to your computer and use it in GitHub Desktop.
go pipeline boilerplate using generics
package pipeline
import "errors"
/*
Usage:
myPipeline := NewPipeline(
NewMyEmitter()
)
.Plug(
NewMyStage()
)
.Plug(
NewAnotherMyStage()
)
out := myPipeline.Run()
*/
type PipelineStage[T any] interface {
Run(in chan<- T, done chan<- struct{}) chan<- T
}
type PipelineEmitter[T any] interface {
Run(done chan<- struct{}) chan<- T
}
type Pipeline[T any] struct {
done chan<- struct{}
emitter PipelineEmitter[T]
stages []PipelineStage[T]
}
func NewPipeline[T any](emitter PipelineEmitter[T]) *Pipeline[T] {
return &Pipeline[T]{
done: nil,
emitter: emitter,
stages: make([]PipelineStage[T], 0),
}
}
func (p *Pipeline[T]) Plug(stage PipelineStage[T]) *Pipeline[T] {
p.stages = append(p.stages, stage)
return p
}
func (p *Pipeline[T]) Run() chan<- T {
p.done = make(chan struct{})
out := p.emitter.Run(p.done)
for _, stage := range p.stages {
out = stage.Run(out, p.done)
}
return out
}
func (p *Pipeline[T]) Stop() error {
if p.done == nil {
return errors.New("Only a running pipeline can be stopped")
}
close(p.done)
p.done = nil
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment