Skip to content

Instantly share code, notes, and snippets.

@hnakagawa
Created June 23, 2017 16:09
Show Gist options
  • Save hnakagawa/5b367a47adfaaee876386b3005aa11d1 to your computer and use it in GitHub Desktop.
Save hnakagawa/5b367a47adfaaee876386b3005aa11d1 to your computer and use it in GitHub Desktop.
package worker
type Task func()
type Dispatcher struct {
TaskQueue chan Task
pool chan chan Task
workers []*worker
}
type worker struct {
pool chan chan Task
taskChannel chan Task
quit chan bool
}
func NewDispatcher(workerNum int, queueSize int) *Dispatcher {
pool := make(chan chan Task, workerNum)
return &Dispatcher{
TaskQueue: make(chan Task, queueSize),
pool: pool,
workers: make([]*worker, workerNum),
}
}
func (d *Dispatcher) Start() {
for i := 0; i < len(d.workers); i++ {
worker := newWorker(d.pool)
worker.start()
d.workers[i] = worker
}
go d.dispatch()
}
func (d *Dispatcher) Stop() {
for _, w := range d.workers {
w.stop()
}
}
func (d *Dispatcher) dispatch() {
for {
select {
case task := <-d.TaskQueue:
go func(task Task) {
taskChannel := <-d.pool
taskChannel <- task
}(task)
}
}
}
func newWorker(pool chan chan Task) *worker {
return &worker{
pool: pool,
taskChannel: make(chan Task),
quit: make(chan bool),
}
}
func (w *worker) start() {
go func() {
for {
w.pool <- w.taskChannel
select {
case task := <-w.taskChannel:
task()
case <-w.quit:
return
}
}
}()
}
func (w *worker) stop() {
go func() {
w.quit <- true
}()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment