Skip to content

Instantly share code, notes, and snippets.

@h12w
Created February 6, 2013 08:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save h12w/4721246 to your computer and use it in GitHub Desktop.
Save h12w/4721246 to your computer and use it in GitHub Desktop.
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
out := make(chan int)
go func() {
for o := range out {
p("output:", o)
}
}()
wg := new(sync.WaitGroup)
input(
stage1(wg, 2,
stage2(wg, 3,
func(o int) {
out <- o
})))
wg.Wait()
close(out)
}
func input(next func(int)) {
for i := 0; i < 10; i++ {
next(i)
}
}
func stage1(wg *sync.WaitGroup, limit int, next func(int)) func(int) {
g := NewSyncGroup(limit, wg)
return func(v int) {
g.Enter()
go func(i int) {
defer g.Done()
p("start stage 1 worker with input", i)
time.Sleep(time.Second / 2)
p("exit stage 1 worker with input", i)
next(i)
}(v)
}
}
func stage2(wg *sync.WaitGroup, limit int, next func(int)) func(int) {
g := NewSyncGroup(limit, wg)
return func(v int) {
g.Enter()
go func(i int) {
defer g.Done()
p("start stage 2 worker with input", i)
time.Sleep(time.Second / 2)
p("exit stage 2 worker with input", i)
next(i)
}(v)
}
}
type SyncGroup struct {
l chan struct{}
w *sync.WaitGroup
}
func NewSyncGroup(limit int, wg *sync.WaitGroup) *SyncGroup {
return &SyncGroup{make(chan struct{}, limit), wg}
}
func (g *SyncGroup) Enter() {
g.l <- struct{}{}
g.w.Add(1)
}
func (g *SyncGroup) Done() {
g.w.Done()
<-g.l
}
func (g *SyncGroup) Wait() {
g.w.Wait()
}
func p(v ...interface{}) {
fmt.Println(v...)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment