Last active
March 15, 2018 11:00
-
-
Save izumin5210/68f35bdea0b8e72f22f7824b0bc4560a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"fmt" | |
"reflect" | |
"sort" | |
"sync" | |
"time" | |
) | |
type Dispatcher struct { | |
workerCnt int | |
} | |
func (d *Dispatcher) Run(in <-chan []int, out chan<- []int) { | |
var wg sync.WaitGroup | |
wg.Add(d.workerCnt) | |
for i := 0; i < d.workerCnt; i++ { | |
go func(i int) { | |
defer wg.Done() | |
(&Worker{i: i}).Run(in, out) | |
}(i) | |
} | |
wg.Wait() | |
close(out) | |
} | |
type Worker struct { | |
i int | |
} | |
func (w *Worker) Run(in <-chan []int, out chan<- []int) { | |
for v := range in { | |
time.Sleep(1 * time.Second) | |
fmt.Printf("worker[%d] received %d items\n", w.i, len(v)) | |
out <- v | |
} | |
} | |
type Aggregator struct { | |
} | |
func (a *Aggregator) Run(ctx context.Context, in <-chan int, out chan<- []int) { | |
ticker := time.NewTicker(1 * time.Second) | |
defer ticker.Stop() | |
defer close(out) | |
list := make([]int, 0, 1000) | |
for { | |
select { | |
case v, ok := <-in: | |
if ok { | |
list = append(list, v) | |
if len(list) > 1000 { | |
out <- list | |
list = make([]int, 0, 1000) | |
} | |
} | |
case <-ticker.C: | |
if len(list) > 100 { | |
out <- list | |
list = make([]int, 0, 1000) | |
} | |
case <-ctx.Done(): | |
for v := range in { | |
list = append(list, v) | |
} | |
out <- list | |
return | |
} | |
} | |
} | |
func main() { | |
var got, want []int | |
for i := 0; i < 100000; i++ { | |
want = append(want, i) | |
} | |
var wg sync.WaitGroup | |
queue := make(chan int) | |
workerQueue := make(chan []int) | |
out := make(chan []int) | |
wg.Add(3) | |
// Dispatcher の結果を取り出し | |
go func() { | |
defer wg.Done() | |
for v := range out { | |
got = append(got, v...) | |
} | |
}() | |
// 仕事する人 | |
go func() { | |
defer wg.Done() | |
(&Dispatcher{workerCnt: 8}).Run(workerQueue, out) | |
}() | |
// とりだして aggregate する人 | |
tickerCtx, tickerCancel := context.WithCancel(context.Background()) | |
go func(ctx context.Context) { | |
defer wg.Done() | |
(&Aggregator{}).Run(ctx, queue, workerQueue) | |
}(tickerCtx) | |
// queuing する人(=> ユーザからのリクエスト) | |
n := 50 | |
var queuingWg sync.WaitGroup | |
queuingWg.Add(n) | |
for i := 0; i < n; i++ { | |
go func(list []int) { | |
defer queuingWg.Done() | |
ticker := time.NewTicker(5 * time.Millisecond) | |
defer ticker.Stop() | |
for { | |
if len(list) == 0 { | |
break | |
} | |
select { | |
case <-ticker.C: | |
queue <- list[0] | |
} | |
list = list[1:] | |
} | |
}(want[i*len(want)/n : (i+1)*len(want)/n]) | |
} | |
// queuing 終わったら queue を close | |
// 実アプリ上では "server の graceful shutdown 完了" に相当 | |
queuingWg.Wait() | |
close(queue) | |
fmt.Println("queue closed") | |
// "queue から取り出して worker に突っ込むくん" を close | |
tickerCancel() | |
// worker 終了待ち | |
wg.Wait() | |
// データ整合性チェック | |
if len(got) == len(want) { | |
fmt.Println("valid length") | |
sort.Ints(got) | |
if reflect.DeepEqual(got, want) { | |
fmt.Println("valid data") | |
} else { | |
fmt.Println("invalid data") | |
} | |
} else { | |
fmt.Printf("invalid length: got %d, want %d\n", len(got), len(want)) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment