Created
September 15, 2014 09:35
-
-
Save h4ckm03d/dbc2c6a75950c29d3da6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import "fmt" | |
type Pipe interface { | |
Process(in chan int) chan int | |
} | |
type Sq struct{} | |
func (sq Sq) Process(in chan int) chan int { | |
out := make(chan int) | |
go func() { | |
for i := range in { | |
out <- i * i | |
} | |
close(out) | |
}() | |
return out | |
} | |
type Add struct{} | |
func (add Add) Process(in chan int) chan int { | |
out := make(chan int) | |
go func() { | |
for i := range in { | |
out <- i + i | |
} | |
close(out) | |
}() | |
return out | |
} | |
func NewPipeline(pipes ...Pipe) Pipeline { | |
head := make(chan int) | |
var next_chan chan int | |
for _, pipe := range pipes { | |
if next_chan == nil { | |
next_chan = pipe.Process(head) | |
} else { | |
next_chan = pipe.Process(next_chan) | |
} | |
} | |
return Pipeline{head: head, tail: next_chan} | |
} | |
type Pipeline struct { | |
head chan int | |
tail chan int | |
} | |
func (p *Pipeline) Enqueue(item int) { | |
p.head <- item | |
} | |
func (p *Pipeline) Dequeue(handler func(int)) { | |
for i := range p.tail { | |
handler(i) | |
} | |
} | |
func (p *Pipeline) Close() { | |
close(p.head) | |
} | |
func main() { | |
pipeline := NewPipeline(Sq{}, Add{}) | |
go func() { | |
for i := 0; i < 10; i++ { | |
fmt.Printf("Sending: %v\n", i) | |
pipeline.Enqueue(i) | |
} | |
fmt.Println("Closing Pipeline.") | |
pipeline.Close() | |
}() | |
pipeline.Dequeue(func(i int) { | |
fmt.Printf("Received: %v\n", i) | |
}) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment