Skip to content

Instantly share code, notes, and snippets.

@h4ckm03d
Created September 15, 2014 09:35
Show Gist options
  • Save h4ckm03d/dbc2c6a75950c29d3da6 to your computer and use it in GitHub Desktop.
Save h4ckm03d/dbc2c6a75950c29d3da6 to your computer and use it in GitHub Desktop.
package main
import "fmt"
type Pipe interface {
Process(in chan int) chan int
}
type Sq struct{}
func (sq Sq) Process(in chan int) chan int {
out := make(chan int)
go func() {
for i := range in {
out <- i * i
}
close(out)
}()
return out
}
type Add struct{}
func (add Add) Process(in chan int) chan int {
out := make(chan int)
go func() {
for i := range in {
out <- i + i
}
close(out)
}()
return out
}
func NewPipeline(pipes ...Pipe) Pipeline {
head := make(chan int)
var next_chan chan int
for _, pipe := range pipes {
if next_chan == nil {
next_chan = pipe.Process(head)
} else {
next_chan = pipe.Process(next_chan)
}
}
return Pipeline{head: head, tail: next_chan}
}
type Pipeline struct {
head chan int
tail chan int
}
func (p *Pipeline) Enqueue(item int) {
p.head <- item
}
func (p *Pipeline) Dequeue(handler func(int)) {
for i := range p.tail {
handler(i)
}
}
func (p *Pipeline) Close() {
close(p.head)
}
func main() {
pipeline := NewPipeline(Sq{}, Add{})
go func() {
for i := 0; i < 10; i++ {
fmt.Printf("Sending: %v\n", i)
pipeline.Enqueue(i)
}
fmt.Println("Closing Pipeline.")
pipeline.Close()
}()
pipeline.Dequeue(func(i int) {
fmt.Printf("Received: %v\n", i)
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment