secret
Last active

  • Download Gist
widget-pipeline.go
Go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
package main
 
import (
"fmt"
"os"
)
 
type Pipeline struct {
Emitter chan *Widget
Drain chan *Widget
processes []func(chan *Widget, chan *Widget)
}
 
func NewPipeline() (*Pipeline, chan *Widget, chan *Widget) {
e, d := make(chan *Widget), make(chan *Widget)
return &Pipeline{Emitter: e, Drain: d}, e, d
}
 
func (pipe *Pipeline) Add(process func(chan *Widget, chan *Widget)) {
pipe.processes = append(pipe.processes, process)
}
 
func (pipe *Pipeline) Execute() {
var in, out chan *Widget
for i, process := range pipe.processes {
switch {
case i == 0:
in, out = pipe.Emitter, make(chan *Widget)
case i == len(pipe.processes):
in, out = make(chan *Widget), pipe.Drain
default:
in, out = out, make(chan *Widget)
}
fmt.Fprintln(os.Stderr, "Pipeline reading from", in, "writing to", out)
go process(in, out)
}
}
 
type Widget struct {
Whiz bool
Pop bool
Bang bool
}
 
var whizWidgets = func(chi chan *Widget, cho chan *Widget) {
for widget := range chi {
widget.Whiz = true
cho <- widget
}
close(cho)
}
 
var popWidgets = func(chi chan *Widget, cho chan *Widget) {
for widget := range chi {
widget.Pop = true
cho <- widget
}
close(cho)
}
 
var bangWidgets = func(chi chan *Widget, cho chan *Widget) {
for widget := range chi {
widget.Bang = true
cho <- widget
}
close(cho)
}
 
func main() {
 
p, e, d := NewPipeline()
 
fmt.Fprintln(os.Stderr, "Input Emitted On", e)
fmt.Fprintln(os.Stderr, "Output Will Drain From", d)
 
p.Add(whizWidgets)
p.Add(popWidgets)
p.Add(bangWidgets)
 
go emit(e)
p.Execute()
drain(d)
 
}
 
func emit(cho chan *Widget) {
for i := 0; i < 1000; i++ {
cho <- &Widget{}
}
close(cho)
}
 
func drain(chi chan *Widget) {
for widget := range chi {
fmt.Println(widget)
}
}

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.