Skip to content

Instantly share code, notes, and snippets.

@FZambia
Created August 21, 2015 09:34
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save FZambia/c40ba5c85266848532f7 to your computer and use it in GitHub Desktop.
Save FZambia/c40ba5c85266848532f7 to your computer and use it in GitHub Desktop.
Go Concurrency Patterns: Pipelines and cancellation - final code from blog post as gist
package main
import (
"fmt"
"sync"
)
func gen(done <-chan struct{}, nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
select {
case out <- n:
case <-done:
return
}
}
}()
return out
}
func sq(done <-chan struct{}, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * n:
case <-done:
return
}
}
}()
return out
}
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed, then calls wg.Done.
output := func(c <-chan int) {
for n := range c {
select {
case out <- n:
case <-done:
return
}
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out
}
const (
sqWorkerNum = 10
)
func main() {
// Set up a done channel that's shared by the whole pipeline,
// and close that channel when this pipeline exits, as a signal
// for all the goroutines we started to exit.
done := make(chan struct{})
defer close(done)
nums := []int{2, 3}
in := gen(done, nums...)
// Distribute the sq work across goroutines that read from in.
workers := make([]<-chan int, sqWorkerNum)
for i := 0; i < sqWorkerNum; i++ {
ch := sq(done, in)
workers[i] = ch
}
// Consume from output.
out := merge(done, workers...)
for n := range out {
fmt.Println(n)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment