Created
October 23, 2019 10:52
-
-
Save millken/3dcbcec91c6fa3c7267866e683473732 to your computer and use it in GitHub Desktop.
Job Queue with Worker Pool
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"runtime" | |
"strconv" | |
"sync" | |
"time" | |
) | |
// Job - interface for job processing | |
type Job interface { | |
Process() | |
} | |
// Worker - the worker threads that actually process the jobs | |
type Worker struct { | |
done sync.WaitGroup | |
readyPool chan chan Job | |
assignedJobQueue chan Job | |
quit chan bool | |
} | |
// JobQueue - a queue for enqueueing jobs to be processed | |
type JobQueue struct { | |
internalQueue chan Job | |
readyPool chan chan Job | |
workers []*Worker | |
dispatcherStopped sync.WaitGroup | |
workersStopped sync.WaitGroup | |
quit chan bool | |
} | |
// NewJobQueue - creates a new job queue | |
func NewJobQueue(maxWorkers int) *JobQueue { | |
workersStopped := sync.WaitGroup{} | |
readyPool := make(chan chan Job, maxWorkers) | |
workers := make([]*Worker, maxWorkers, maxWorkers) | |
for i := 0; i < maxWorkers; i++ { | |
workers[i] = NewWorker(readyPool, workersStopped) | |
} | |
return &JobQueue{ | |
internalQueue: make(chan Job), | |
readyPool: readyPool, | |
workers: workers, | |
dispatcherStopped: sync.WaitGroup{}, | |
workersStopped: workersStopped, | |
quit: make(chan bool), | |
} | |
} | |
// Start - starts the worker routines and dispatcher routine | |
func (q *JobQueue) Start() { | |
for i := 0; i < len(q.workers); i++ { | |
q.workers[i].Start() | |
} | |
go q.dispatch() | |
} | |
// Stop - stops the workers and sispatcher routine | |
func (q *JobQueue) Stop() { | |
q.quit <- true | |
q.dispatcherStopped.Wait() | |
} | |
func (q *JobQueue) dispatch() { | |
q.dispatcherStopped.Add(1) | |
for { | |
select { | |
case job := <-q.internalQueue: // We got something in on our queue | |
workerChannel := <-q.readyPool // Check out an available worker | |
workerChannel <- job // Send the request to the channel | |
case <-q.quit: | |
for i := 0; i < len(q.workers); i++ { | |
q.workers[i].Stop() | |
} | |
q.workersStopped.Wait() | |
q.dispatcherStopped.Done() | |
return | |
} | |
} | |
} | |
// Submit - adds a new job to be processed | |
func (q *JobQueue) Submit(job Job) { | |
q.internalQueue <- job | |
} | |
// NewWorker - creates a new worker | |
func NewWorker(readyPool chan chan Job, done sync.WaitGroup) *Worker { | |
return &Worker{ | |
done: done, | |
readyPool: readyPool, | |
assignedJobQueue: make(chan Job), | |
quit: make(chan bool), | |
} | |
} | |
// Start - begins the job processing loop for the worker | |
func (w *Worker) Start() { | |
go func() { | |
w.done.Add(1) | |
for { | |
w.readyPool <- w.assignedJobQueue // check the job queue in | |
select { | |
case job := <-w.assignedJobQueue: // see if anything has been assigned to the queue | |
job.Process() | |
case <-w.quit: | |
w.done.Done() | |
return | |
} | |
} | |
}() | |
} | |
// Stop - stops the worker | |
func (w *Worker) Stop() { | |
w.quit <- true | |
} | |
//////////////// Example ////////////////// | |
// TestJob - holds only an ID to show state | |
type TestJob struct { | |
ID string | |
} | |
// Process - test process function | |
func (t *TestJob) Process() { | |
fmt.Printf("Processing job '%s'\n", t.ID) | |
time.Sleep(1 * time.Second) | |
} | |
func main() { | |
queue := NewJobQueue(runtime.NumCPU()) | |
queue.Start() | |
defer queue.Stop() | |
for i := 0; i < 4*runtime.NumCPU(); i++ { | |
queue.Submit(&TestJob{strconv.Itoa(i)}) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment