Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Job queues in Golang

Golang Workers / Job Queue

A running example of the code from:

This gist creates a working example from blog post, and a alternate example using simple worker pool.

TLDR: if you want simple and controlled concurrency use a worker pool.

Step 1

Small refactorings made to original code:

Step 2

Simplify the worker queue by removing the Dispatcher.

  • Creates workers directly and passes job queue to them

https://gist.github.com/harlow/dbcd639cf8d396a2ab73#file-worker_refactored-go

Run the Application

Boot either the worker_original.go or the worker_refactored.go applications. Use flags to adjust the max_workers and max_queue_size to override the default values.

$ go run worker_original.go -max_workers 5

cURL the application from another terminal window:

$ for i in {1..15}; do curl localhost:8080/work -d name=job$i -d delay=$(expr $i % 9 + 1)s; done

Performance

The test run with Pprof show performance characteristics remain the same between both examples.

// Original code with Dispatcher
package main
import (
_ "expvar"
"flag"
"fmt"
"net/http"
_ "net/http/pprof"
"os"
"time"
)
// Job holds the attributes needed to perform unit of work.
type Job struct {
Name string
Delay time.Duration
}
// NewWorker creates takes a numeric id and a channel w/ worker pool.
func NewWorker(id int, workerPool chan chan Job) Worker {
return Worker{
id: id,
jobQueue: make(chan Job),
workerPool: workerPool,
quitChan: make(chan bool),
}
}
type Worker struct {
id int
jobQueue chan Job
workerPool chan chan Job
quitChan chan bool
}
func (w Worker) start() {
go func() {
for {
// Add my jobQueue to the worker pool.
w.workerPool <- w.jobQueue
select {
case job := <-w.jobQueue:
// Dispatcher has added a job to my jobQueue.
fmt.Printf("worker%d: started %s, blocking for %f seconds\n", w.id, job.Name, job.Delay.Seconds())
time.Sleep(job.Delay)
fmt.Printf("worker%d: completed %s!\n", w.id, job.Name)
case <-w.quitChan:
// We have been asked to stop.
fmt.Printf("worker%d stopping\n", w.id)
return
}
}
}()
}
func (w Worker) stop() {
go func() {
w.quitChan <- true
}()
}
// NewDispatcher creates, and returns a new Dispatcher object.
func NewDispatcher(jobQueue chan Job, maxWorkers int) *Dispatcher {
workerPool := make(chan chan Job, maxWorkers)
return &Dispatcher{
jobQueue: jobQueue,
maxWorkers: maxWorkers,
workerPool: workerPool,
}
}
type Dispatcher struct {
workerPool chan chan Job
maxWorkers int
jobQueue chan Job
}
func (d *Dispatcher) run() {
for i := 0; i < d.maxWorkers; i++ {
worker := NewWorker(i+1, d.workerPool)
worker.start()
}
go d.dispatch()
}
func (d *Dispatcher) dispatch() {
for {
select {
case job := <-d.jobQueue:
go func() {
fmt.Printf("fetching workerJobQueue for: %s\n", job.Name)
workerJobQueue := <-d.workerPool
fmt.Printf("adding %s to workerJobQueue\n", job.Name)
workerJobQueue <- job
}()
}
}
}
func requestHandler(w http.ResponseWriter, r *http.Request, jobQueue chan Job) {
// Make sure we can only be called with an HTTP POST request.
if r.Method != "POST" {
w.Header().Set("Allow", "POST")
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// Parse the delay.
delay, err := time.ParseDuration(r.FormValue("delay"))
if err != nil {
http.Error(w, "Bad delay value: "+err.Error(), http.StatusBadRequest)
return
}
// Validate delay is in range 1 to 10 seconds.
if delay.Seconds() < 1 || delay.Seconds() > 10 {
http.Error(w, "The delay must be between 1 and 10 seconds, inclusively.", http.StatusBadRequest)
return
}
// Set name and validate value.
name := r.FormValue("name")
if name == "" {
http.Error(w, "You must specify a name.", http.StatusBadRequest)
return
}
// Create Job and push the work onto the jobQueue.
job := Job{Name: name, Delay: delay}
jobQueue <- job
// Render success.
w.WriteHeader(http.StatusCreated)
}
func main() {
var (
maxWorkers = flag.Int("max_workers", 5, "The number of workers to start")
maxQueueSize = flag.Int("max_queue_size", 100, "The size of job queue")
port = flag.String("port", "8080", "The server port")
)
flag.Parse()
// Create the job queue.
jobQueue := make(chan Job, *maxQueueSize)
// Start the dispatcher.
dispatcher := NewDispatcher(jobQueue, *maxWorkers)
dispatcher.run()
// Start the HTTP handler.
http.HandleFunc("/work", func(w http.ResponseWriter, r *http.Request) {
requestHandler(w, r, jobQueue)
})
log.Fatal(http.ListenAndServe(":"+*port, nil))
}
package main
import (
_ "expvar"
"flag"
"fmt"
"log"
"net/http"
_ "net/http/pprof"
"time"
)
type job struct {
name string
duration time.Duration
}
func doWork(id int, j job) {
fmt.Printf("worker%d: started %s, working for %f seconds\n", id, j.name, j.duration.Seconds())
time.Sleep(j.duration)
fmt.Printf("worker%d: completed %s!\n", w.id, j.name)
}
func requestHandler(jobs chan job, w http.ResponseWriter, r *http.Request) {
// Make sure we can only be called with an HTTP POST request.
if r.Method != "POST" {
w.Header().Set("Allow", "POST")
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// Parse the durations.
duration, err := time.ParseDuration(r.FormValue("delay"))
if err != nil {
http.Error(w, "Bad delay value: "+err.Error(), http.StatusBadRequest)
return
}
// Validate delay is in range 1 to 10 seconds.
if duration.Seconds() < 1 || duration.Seconds() > 10 {
http.Error(w, "The delay must be between 1 and 10 seconds, inclusively.", http.StatusBadRequest)
return
}
// Set name and validate value.
name := r.FormValue("name")
if name == "" {
http.Error(w, "You must specify a name.", http.StatusBadRequest)
return
}
// Create Job and push the work onto the jobCh.
job := job{name, duration}
go func() {
fmt.Printf("added: %s %s\n", job.name, job.duration)
jobs <- job
}()
// Render success.
w.WriteHeader(http.StatusCreated)
return
}
func main() {
var (
maxQueueSize = flag.Int("max_queue_size", 100, "The size of job queue")
maxWorkers = flag.Int("max_workers", 5, "The number of workers to start")
port = flag.String("port", "8080", "The server port")
)
flag.Parse()
// create job channel
jobs := make(chan job, *maxQueueSize)
// create workers
for i := 1; i <= *maxWorkers; i++ {
go func(i int) {
for j := range jobs {
doWork(i, j)
}
}(i)
}
// handler for adding jobs
http.HandleFunc("/work", func(w http.ResponseWriter, r *http.Request) {
requestHandler(jobs, w, r)
})
log.Fatal(http.ListenAndServe(":"+*port, nil))
}
package main
import (
_ "expvar"
"fmt"
"math/rand"
"sync"
"time"
)
const maxWorkers = 10
type job struct {
name string
duration time.Duration
}
func doWork(id int, j job) {
fmt.Printf("worker%d: started %s, working for %fs\n", id, j.name, j.duration.Seconds())
time.Sleep(j.duration)
fmt.Printf("worker%d: completed %s!\n", id, j.name)
}
func main() {
// channel for jobs
jobs := make(chan job)
// start workers
wg := &sync.WaitGroup{}
wg.Add(maxWorkers)
for i := 1; i <= maxWorkers; i++ {
go func(i int) {
defer wg.Done()
for j := range jobs {
doWork(i, j)
}
}(i)
}
// add jobs
for i := 0; i < 100; i++ {
name := fmt.Sprintf("job-%d", i)
duration := time.Duration(rand.Intn(1000)) * time.Millisecond
fmt.Printf("adding: %s %s\n", name, duration)
jobs <- job{name, duration}
}
close(jobs)
// wait for workers to complete
wg.Wait()
}
@KanybekMomukeyev

This comment has been minimized.

Copy link

commented Mar 28, 2017

Thanks good example to learn golang, would be there tests for this code?

@mocanuga

This comment has been minimized.

Copy link

commented May 2, 2017

Hi,

Thanks for this example code.
I would like to point out an error in the refactored code, at line 21.

There is no variable w. So w.id should be just id.

@feliperohdee

This comment has been minimized.

Copy link

commented May 5, 2017

Hi @harlow

whats the difference in use goroutine or not at this line https://gist.github.com/harlow/dbcd639cf8d396a2ab73#file-worker_refactored-go-L54?

@minhdv118

This comment has been minimized.

Copy link

commented May 12, 2017

hi.
I'm tested with jmetter and have error memory leak

http://2.pik.vn/2017ab2892ef-df80-43f1-a812-4c7347bdf94a.png

@vardius

This comment has been minimized.

Copy link

commented Jul 25, 2017

@feliperohdee

I quote the answer from @zerkms

if the jobs channel is not buffered or is full - then if you don't have a goroutine there it would block the whole method
with goroutine though the method returns and sends an http response to the client

@feliperohdee

This comment has been minimized.

Copy link

commented Sep 2, 2017

Thank u @vardius

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.