Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@h12w
Created February 6, 2013 08:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save h12w/4721236 to your computer and use it in GitHub Desktop.
Save h12w/4721236 to your computer and use it in GitHub Desktop.
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
in := input_gen()
out1 := stage1_gen(in, 2)
out2 := stage2_gen(out1, 3)
for o := range out2 {
p("output: ", o)
}
}
func input_gen() chan int {
in := make(chan int)
go func() {
defer close(in)
for i := 0; i < 10; i++ {
in <- i
}
}()
return in
}
func stage1_gen(in <-chan int, limit int) chan int {
out := make(chan int)
g := NewSyncGroup(limit)
go func() {
defer close(out)
for v := range in {
g.Enter()
go func(i int) {
defer g.Done()
p("start stage 1 worker with input", i)
time.Sleep(time.Second / 2)
p("exit stage 1 worker with input", i)
out <- i
}(v)
}
g.Wait()
}()
return out
}
func stage2_gen(in <-chan int, limit int) chan int {
out := make(chan int)
g := NewSyncGroup(limit)
go func() {
defer close(out)
for v := range in {
g.Enter()
go func(i int) {
defer g.Done()
p("start stage 2 worker with input", i)
time.Sleep(time.Second / 2)
p("exit stage 2 worker with input", i)
out <- i
}(v)
}
g.Wait()
}()
return out
}
type SyncGroup struct {
l chan struct{}
w *sync.WaitGroup
}
func NewSyncGroup(limit int) *SyncGroup {
return &SyncGroup{make(chan struct{}, limit), new(sync.WaitGroup)}
}
func (g *SyncGroup) Enter() {
g.l <- struct{}{}
g.w.Add(1)
}
func (g *SyncGroup) Done() {
g.w.Done()
<-g.l
}
func (g *SyncGroup) Wait() {
g.w.Wait()
}
func p(v ...interface{}) {
fmt.Println(v...)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment