Skip to content

Instantly share code, notes, and snippets.

@frengky
Created April 4, 2019 12:10
Show Gist options
  • Save frengky/96fe9f0e4508e5f35d09ff36c0d03a75 to your computer and use it in GitHub Desktop.
Save frengky/96fe9f0e4508e5f35d09ff36c0d03a75 to your computer and use it in GitHub Desktop.
Worker pool implementation in Go
package msgr
import (
"log"
)
type Task interface {
execute() (string, error)
}
type Job struct {
desc string
payload Task
}
// A buffered channel that we can send work requests on.
var JobQueue chan Job
type Worker struct {
Id int
WorkerPool chan chan Job
Work chan Job
quit chan bool
}
func NewWorker(id int, workerPool chan chan Job) Worker {
worker := Worker{
Id: id,
WorkerPool: workerPool,
Work: make(chan Job),
quit: make(chan bool),
}
return worker
}
// Start method starts the run loop for the worker, listening for a quit channel in
// case we need to stop it
func (w Worker) Start() {
go func() {
for {
// register the current worker into the worker queue.
w.WorkerPool <- w.Work
select {
case job := <-w.Work:
// we have received a work request.
result, err := job.payload.execute()
if err != nil {
log.Printf("%s: %s", job.desc, err.Error())
} else {
log.Printf("%s: %s", job.desc, result)
}
case <-w.quit:
// we have received a signal to stop
return
}
}
}()
}
// Stop signals the worker to stop listening for work requests.
func (w Worker) Stop() {
go func() {
w.quit <- true
}()
}
type Dispatcher struct {
WorkerPool chan chan Job
maxWorkers int
}
func NewDispatcher(nWorkers int) *Dispatcher {
pool := make(chan chan Job, nWorkers)
return &Dispatcher{WorkerPool: pool, maxWorkers: nWorkers}
}
func (d *Dispatcher) Run() {
log.Printf("Dispatching %v workers", d.maxWorkers)
for i := 0; i < d.maxWorkers; i++ {
w := NewWorker(i+1, d.WorkerPool)
w.Start()
}
go d.dispatch()
}
func (d *Dispatcher) dispatch() {
for {
select {
// Received work request
case job := <-JobQueue:
go func(j Job) {
work := <-d.WorkerPool
// Dispatching work request
work <- j
}(job)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment