Skip to content

Instantly share code, notes, and snippets.

@egonelbre
Created July 3, 2014 05:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save egonelbre/1322cf22cc0043d6b276 to your computer and use it in GitHub Desktop.
Save egonelbre/1322cf22cc0043d6b276 to your computer and use it in GitHub Desktop.
package main
import (
"fmt"
"sync"
)
type errors []error
func (e errors) Error() string {
return e[0].Error()
}
type Pipeline struct {
merr sync.RWMutex
errs errors
once sync.Once
wait sync.WaitGroup
next int
numbers chan int
filtered chan int
}
func NewPipeline() *Pipeline {
p := &Pipeline{}
p.numbers = make(chan int)
p.filtered = make(chan int)
p.wait.Add(3)
go p.generate()
go p.filter()
go p.print()
return p
}
func (p *Pipeline) err(err error) {
p.merr.Lock()
p.errs = append(p.errs, err)
p.merr.Unlock()
}
func (p *Pipeline) killf(format string, a ...interface{}) {
p.err(fmt.Errorf(format, a...))
}
func (p *Pipeline) Err() error {
p.merr.RLock()
errs := p.errs
p.merr.RUnlock()
return errs
}
func (p *Pipeline) shutdown() {
if err := recover(); err != nil {
p.err(fmt.Errorf("panic: %v", err))
}
p.wait.Done()
p.once.Do(func() {
close(p.numbers)
close(p.filtered)
})
}
func (p *Pipeline) Wait() error {
p.wait.Wait()
return p.Err()
}
func (p *Pipeline) generate() {
defer p.shutdown()
for {
p.numbers <- p.next
p.next += 1
}
}
func (p *Pipeline) filter() {
defer p.shutdown()
for v := range p.numbers {
if v%2 == 0 {
p.filtered <- v
}
}
}
func (p *Pipeline) print() {
defer p.shutdown()
for v := range p.filtered {
if v > 10 {
p.killf("the number %v is too big", v)
return
}
fmt.Println(v)
}
}
func main() {
p := NewPipeline()
if err := p.Wait(); err != nil {
fmt.Println(err)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment