Skip to content

Instantly share code, notes, and snippets.

@izumin5210
Last active March 15, 2018 11:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save izumin5210/68f35bdea0b8e72f22f7824b0bc4560a to your computer and use it in GitHub Desktop.
Save izumin5210/68f35bdea0b8e72f22f7824b0bc4560a to your computer and use it in GitHub Desktop.
package main
import (
"context"
"fmt"
"reflect"
"sort"
"sync"
"time"
)
type Dispatcher struct {
workerCnt int
}
func (d *Dispatcher) Run(in <-chan []int, out chan<- []int) {
var wg sync.WaitGroup
wg.Add(d.workerCnt)
for i := 0; i < d.workerCnt; i++ {
go func(i int) {
defer wg.Done()
(&Worker{i: i}).Run(in, out)
}(i)
}
wg.Wait()
close(out)
}
type Worker struct {
i int
}
func (w *Worker) Run(in <-chan []int, out chan<- []int) {
for v := range in {
time.Sleep(1 * time.Second)
fmt.Printf("worker[%d] received %d items\n", w.i, len(v))
out <- v
}
}
type Aggregator struct {
}
func (a *Aggregator) Run(ctx context.Context, in <-chan int, out chan<- []int) {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
defer close(out)
list := make([]int, 0, 1000)
for {
select {
case v, ok := <-in:
if ok {
list = append(list, v)
if len(list) > 1000 {
out <- list
list = make([]int, 0, 1000)
}
}
case <-ticker.C:
if len(list) > 100 {
out <- list
list = make([]int, 0, 1000)
}
case <-ctx.Done():
for v := range in {
list = append(list, v)
}
out <- list
return
}
}
}
func main() {
var got, want []int
for i := 0; i < 100000; i++ {
want = append(want, i)
}
var wg sync.WaitGroup
queue := make(chan int)
workerQueue := make(chan []int)
out := make(chan []int)
wg.Add(3)
// Dispatcher の結果を取り出し
go func() {
defer wg.Done()
for v := range out {
got = append(got, v...)
}
}()
// 仕事する人
go func() {
defer wg.Done()
(&Dispatcher{workerCnt: 8}).Run(workerQueue, out)
}()
// とりだして aggregate する人
tickerCtx, tickerCancel := context.WithCancel(context.Background())
go func(ctx context.Context) {
defer wg.Done()
(&Aggregator{}).Run(ctx, queue, workerQueue)
}(tickerCtx)
// queuing する人(=> ユーザからのリクエスト)
n := 50
var queuingWg sync.WaitGroup
queuingWg.Add(n)
for i := 0; i < n; i++ {
go func(list []int) {
defer queuingWg.Done()
ticker := time.NewTicker(5 * time.Millisecond)
defer ticker.Stop()
for {
if len(list) == 0 {
break
}
select {
case <-ticker.C:
queue <- list[0]
}
list = list[1:]
}
}(want[i*len(want)/n : (i+1)*len(want)/n])
}
// queuing 終わったら queue を close
// 実アプリ上では "server の graceful shutdown 完了" に相当
queuingWg.Wait()
close(queue)
fmt.Println("queue closed")
// "queue から取り出して worker に突っ込むくん" を close
tickerCancel()
// worker 終了待ち
wg.Wait()
// データ整合性チェック
if len(got) == len(want) {
fmt.Println("valid length")
sort.Ints(got)
if reflect.DeepEqual(got, want) {
fmt.Println("valid data")
} else {
fmt.Println("invalid data")
}
} else {
fmt.Printf("invalid length: got %d, want %d\n", len(got), len(want))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment