Skip to content

Instantly share code, notes, and snippets.

@chenlujjj
Created March 21, 2021 14:30
Show Gist options
  • Save chenlujjj/f2cc6b75e5276e41bf82b5d561fcf28f to your computer and use it in GitHub Desktop.
Save chenlujjj/f2cc6b75e5276e41bf82b5d561fcf28f to your computer and use it in GitHub Desktop.
example code from "Concurrency is not Parallelism" talk by Rob Pike
package main
import (
"container/heap"
"fmt"
"math/rand"
"time"
)
const nRequester = 100
const nWorker = 10
// Simulation of some work: just sleep for a while and report how long.
func op() int {
n := rand.Int63n(int64(time.Second))
time.Sleep(time.Duration(nWorker * n))
return int(n)
}
type Request struct {
fn func() int // The operation to perform.
c chan int // The channel to return the result.
}
func requester(work chan Request) {
c := make(chan int)
for {
time.Sleep(time.Duration(rand.Int63n(int64(nWorker * 2 * time.Second))))
work <- Request{op, c}
// c 的作用是让Request模拟同步请求(即响应返回后再发出下一次请求)
<-c
}
}
type Worker struct {
i int // index in the heap
requests chan Request // work to do (buffered channel)
pending int // count of pending tasks
}
func (w *Worker) work(done chan *Worker) {
for {
req := <-w.requests
req.c <- req.fn()
done <- w
}
}
// Pool implements heap.Interface.
type Pool []*Worker
func (p Pool) Len() int { return len(p) }
// 优先分配给pending数量少的Worker
func (p Pool) Less(i, j int) bool {
return p[i].pending < p[j].pending
}
func (p *Pool) Swap(i, j int) {
a := *p
a[i], a[j] = a[j], a[i]
a[i].i = i
a[j].i = j
}
func (p *Pool) Push(x interface{}) {
a := *p
n := len(a)
a = a[0 : n+1]
w := x.(*Worker)
a[n] = w
w.i = n
*p = a
}
func (p *Pool) Pop() interface{} {
a := *p
*p = a[0 : len(a)-1]
w := a[len(a)-1]
w.i = -1 // for safety
return w
}
type Balancer struct {
pool Pool
// done 用来控制最多有多少个worker同时工作,即done的buffer size
done chan *Worker
// i 表示在round-robin模式下,轮到的worker的index
i int
// balancer works in round-robin mode
// 分析和实验都可得出,在round-robin工作模式下,pool中worker.pending的标准差要大于按照最小负载分配的工作模式
rr bool
}
func NewBalancer() *Balancer {
done := make(chan *Worker, nWorker)
b := &Balancer{make(Pool, 0, nWorker), done, 0, false}
for i := 0; i < nWorker; i++ {
w := &Worker{requests: make(chan Request, nRequester)}
heap.Push(&b.pool, w)
go w.work(b.done)
}
return b
}
func (b *Balancer) balance(work chan Request) {
for {
select {
case req := <-work:
b.dispatch(req)
case w := <-b.done:
b.completed(w)
}
b.print()
}
}
// 打印出pool内Worker的pending数量,以及其平均值和标准差
func (b *Balancer) print() {
sum := 0
sumsq := 0
for _, w := range b.pool {
fmt.Printf("%d ", w.pending)
sum += w.pending
sumsq += w.pending * w.pending
}
avg := float64(sum) / float64(len(b.pool))
// 计算标准差
variance := float64(sumsq)/float64(len(b.pool)) - avg*avg
fmt.Printf(" %.2f %.2f\n", avg, variance)
}
func (b *Balancer) dispatch(req Request) {
// if语句块内的相当于是 round robin 形式的load balance
if b.rr {
w := b.pool[b.i]
w.requests <- req
w.pending++
b.i++
if b.i >= len(b.pool) {
b.i = 0
}
return
}
// Grab the least loaded worker
w := heap.Pop(&b.pool).(*Worker)
w.requests <- req
w.pending++
// fmt.Printf("started %p; now %d\n", w, w.pending)
heap.Push(&b.pool, w)
}
func (b *Balancer) completed(w *Worker) {
if b.rr {
w.pending--
return
}
w.pending--
// fmt.Printf("finished %p; now %d\n", w, w.pending)
heap.Remove(&b.pool, w.i)
heap.Push(&b.pool, w)
}
func main() {
work := make(chan Request)
for i := 0; i < nRequester; i++ {
go requester(work)
}
NewBalancer().balance(work)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment