Skip to content

Instantly share code, notes, and snippets.

@rongyi
Created November 15, 2016 06:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rongyi/436a54de5162dd5a765c381e88608120 to your computer and use it in GitHub Desktop.
Save rongyi/436a54de5162dd5a765c381e88608120 to your computer and use it in GitHub Desktop.
golang worker queue
package main
import (
"log"
"time"
)
const (
// MaxWorker : max work count
MaxWorker = 100
)
// Job represent
type Job struct {
Payload string
}
var JobQueue chan Job
type Worker struct {
WorkerPool chan chan Job
JobChannel chan Job
quit chan bool
id int
}
func NewWorker(workerPool chan chan Job, id int) Worker {
return Worker{
WorkerPool: workerPool,
JobChannel: make(chan Job),
quit: make(chan bool),
id: id,
}
}
func (w Worker) Start() {
go func() {
for {
w.WorkerPool <- w.JobChannel
select {
case job := <-w.JobChannel:
// thre real job action
log.Println(job.Payload)
}
}
}()
}
func (w Worker) Stop() {
go func() {
w.quit <- true
}()
}
type Dispatcher struct {
WorkerPool chan chan Job
MaxWorker int
}
func NewDispatcher(maxWorkers int) *Dispatcher {
pool := make(chan chan Job, maxWorkers)
return &Dispatcher{
WorkerPool: pool,
MaxWorker: maxWorkers,
}
}
func (d *Dispatcher) Run() []Worker{
var ret []Worker
for i := 0; i < d.MaxWorker; i++ {
curWorker := NewWorker(d.WorkerPool, i)
curWorker.Start()
ret = append(ret, curWorker)
}
go d.dispatch()
return ret
}
func (d *Dispatcher) dispatch() {
for {
select {
case job := <-JobQueue:
go func(job Job) {
jobChan := <-d.WorkerPool
jobChan <- job
}(job)
}
}
}
func main() {
JobQueue = make(chan Job)
d := NewDispatcher(100)
workers := d.Run()
for i := 0; i < 1000000; i++ {
job := Job{
Payload: "hello",
}
JobQueue <- job
}
time.Sleep(time.Second * 10)
log.Println("stop here")
for _, curWorker := range workers {
log.Println("stop worker")
curWorker.Stop()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment