Skip to content

Instantly share code, notes, and snippets.

@hawell
Last active October 22, 2018 14:13
Show Gist options
  • Save hawell/ee7a1e7ae22c0e35e43b8bc60cc71e53 to your computer and use it in GitHub Desktop.
Save hawell/ee7a1e7ae22c0e35e43b8bc60cc71e53 to your computer and use it in GitHub Desktop.
Golang WorkerPool
package main
import (
"fmt"
"time"
)
type Job interface {
Handle()
}
type Dispatcher struct {
WorkerPool chan chan Job
WorkerList []*Worker
MaxWorkers int
MaxWaitingJobs int
JobQueue chan Job
quit chan bool
}
func NewDispatcher(maxWorkers int, maxWaitingJobs int) *Dispatcher {
d := &Dispatcher {
WorkerPool: make(chan chan Job, maxWorkers),
MaxWorkers: maxWorkers,
JobQueue: make(chan Job, maxWaitingJobs),
quit: make(chan bool),
}
for i := 0; i< maxWorkers; i++ {
w := NewWorker(d.WorkerPool, i)
d.WorkerList = append(d.WorkerList, w)
}
return d
}
func (d *Dispatcher) Run() {
for _, w := range d.WorkerList {
w.Run()
}
go d.Dispatch()
}
func (d *Dispatcher) Dispatch() {
for {
select {
case job := <- d.JobQueue:
go func(job Job) {
jobChannel := <- d.WorkerPool
jobChannel <- job
}(job)
case <- d.quit:
for _, w := range d.WorkerList {
w.Stop()
}
return
}
}
}
func (d *Dispatcher) Queue(job Job) {
go func() { d.JobQueue <- job }()
}
func (d *Dispatcher) Stop() {
d.quit <- true
}
type Worker struct {
Id int
WorkerPool chan chan Job
JobChannel chan Job
quit chan bool
}
func NewWorker(workerPool chan chan Job, id int) *Worker {
return &Worker {
Id: id,
WorkerPool: workerPool,
JobChannel: make(chan Job),
quit: make(chan bool),
}
}
func (w *Worker) Run() {
go func() {
for {
w.WorkerPool <- w.JobChannel
// fmt.Println("worker ", w.Id, " waiting for job")
select {
case job := <- w.JobChannel:
// fmt.Println("worker ", w.Id, " got a job")
job.Handle()
// fmt.Println("worker ", w.Id, " job done")
case <- w.quit:
fmt.Println("worker ", w.Id, " quit")
return
}
}
}()
}
func (w *Worker) Stop() {
go func() {
w.quit <- true
}()
}
type AJob string
func (aJob AJob) Handle() {
fmt.Println(aJob)
}
func main() {
d := NewDispatcher(10, 100)
d.Run()
for i := 0; i < 100; i++ {
job := AJob(fmt.Sprintf("job %d", i))
d.Queue(job)
}
time.Sleep(time.Second * 5)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment