Created
September 11, 2017 12:14
-
-
Save Chen-tao/69d3dd7202dba1beb8abedf6fe712308 to your computer and use it in GitHub Desktop.
Golang Workers / Job Queue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Original code with Dispatcher | |
package main | |
import ( | |
_ "expvar" | |
"flag" | |
"fmt" | |
"net/http" | |
_ "net/http/pprof" | |
"os" | |
"time" | |
) | |
// Job holds the attributes needed to perform unit of work. | |
type Job struct { | |
Name string | |
Delay time.Duration | |
} | |
// NewWorker creates takes a numeric id and a channel w/ worker pool. | |
func NewWorker(id int, workerPool chan chan Job) Worker { | |
return Worker{ | |
id: id, | |
jobQueue: make(chan Job), | |
workerPool: workerPool, | |
quitChan: make(chan bool), | |
} | |
} | |
type Worker struct { | |
id int | |
jobQueue chan Job | |
workerPool chan chan Job | |
quitChan chan bool | |
} | |
func (w Worker) start() { | |
go func() { | |
for { | |
// Add my jobQueue to the worker pool. | |
w.workerPool <- w.jobQueue | |
select { | |
case job := <-w.jobQueue: | |
// Dispatcher has added a job to my jobQueue. | |
fmt.Printf("worker%d: started %s, blocking for %f seconds\n", w.id, job.Name, job.Delay.Seconds()) | |
time.Sleep(job.Delay) | |
fmt.Printf("worker%d: completed %s!\n", w.id, job.Name) | |
case <-w.quitChan: | |
// We have been asked to stop. | |
fmt.Printf("worker%d stopping\n", w.id) | |
return | |
} | |
} | |
}() | |
} | |
func (w Worker) stop() { | |
go func() { | |
w.quitChan <- true | |
}() | |
} | |
// NewDispatcher creates, and returns a new Dispatcher object. | |
func NewDispatcher(jobQueue chan Job, maxWorkers int) *Dispatcher { | |
workerPool := make(chan chan Job, maxWorkers) | |
return &Dispatcher{ | |
jobQueue: jobQueue, | |
maxWorkers: maxWorkers, | |
workerPool: workerPool, | |
} | |
} | |
type Dispatcher struct { | |
workerPool chan chan Job | |
maxWorkers int | |
jobQueue chan Job | |
} | |
func (d *Dispatcher) run() { | |
for i := 0; i < d.maxWorkers; i++ { | |
worker := NewWorker(i+1, d.workerPool) | |
worker.start() | |
} | |
go d.dispatch() | |
} | |
func (d *Dispatcher) dispatch() { | |
for { | |
select { | |
case job := <-d.jobQueue: | |
go func() { | |
fmt.Printf("fetching workerJobQueue for: %s\n", job.Name) | |
workerJobQueue := <-d.workerPool | |
fmt.Printf("adding %s to workerJobQueue\n", job.Name) | |
workerJobQueue <- job | |
}() | |
} | |
} | |
} | |
func requestHandler(w http.ResponseWriter, r *http.Request, jobQueue chan Job) { | |
// Make sure we can only be called with an HTTP POST request. | |
if r.Method != "POST" { | |
w.Header().Set("Allow", "POST") | |
w.WriteHeader(http.StatusMethodNotAllowed) | |
return | |
} | |
// Parse the delay. | |
delay, err := time.ParseDuration(r.FormValue("delay")) | |
if err != nil { | |
http.Error(w, "Bad delay value: "+err.Error(), http.StatusBadRequest) | |
return | |
} | |
// Validate delay is in range 1 to 10 seconds. | |
if delay.Seconds() < 1 || delay.Seconds() > 10 { | |
http.Error(w, "The delay must be between 1 and 10 seconds, inclusively.", http.StatusBadRequest) | |
return | |
} | |
// Set name and validate value. | |
name := r.FormValue("name") | |
if name == "" { | |
http.Error(w, "You must specify a name.", http.StatusBadRequest) | |
return | |
} | |
// Create Job and push the work onto the jobQueue. | |
job := Job{Name: name, Delay: delay} | |
jobQueue <- job | |
// Render success. | |
w.WriteHeader(http.StatusCreated) | |
} | |
func main() { | |
var ( | |
maxWorkers = flag.Int("max_workers", 5, "The number of workers to start") | |
maxQueueSize = flag.Int("max_queue_size", 100, "The size of job queue") | |
port = flag.String("port", "8080", "The server port") | |
) | |
flag.Parse() | |
// Create the job queue. | |
jobQueue := make(chan Job, *maxQueueSize) | |
// Start the dispatcher. | |
dispatcher := NewDispatcher(jobQueue, *maxWorkers) | |
dispatcher.run() | |
// Start the HTTP handler. | |
http.HandleFunc("/work", func(w http.ResponseWriter, r *http.Request) { | |
requestHandler(w, r, jobQueue) | |
}) | |
log.Fatal(http.ListenAndServe(":"+*port, nil)) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
_ "expvar" | |
"flag" | |
"fmt" | |
"log" | |
"net/http" | |
_ "net/http/pprof" | |
"time" | |
) | |
type job struct { | |
name string | |
duration time.Duration | |
} | |
func doWork(id int, j job) { | |
fmt.Printf("worker%d: started %s, working for %f seconds\n", id, j.name, j.duration.Seconds()) | |
time.Sleep(j.duration) | |
fmt.Printf("worker%d: completed %s!\n", w.id, j.name) | |
} | |
func requestHandler(jobs chan job, w http.ResponseWriter, r *http.Request) { | |
// Make sure we can only be called with an HTTP POST request. | |
if r.Method != "POST" { | |
w.Header().Set("Allow", "POST") | |
w.WriteHeader(http.StatusMethodNotAllowed) | |
return | |
} | |
// Parse the durations. | |
duration, err := time.ParseDuration(r.FormValue("delay")) | |
if err != nil { | |
http.Error(w, "Bad delay value: "+err.Error(), http.StatusBadRequest) | |
return | |
} | |
// Validate delay is in range 1 to 10 seconds. | |
if duration.Seconds() < 1 || duration.Seconds() > 10 { | |
http.Error(w, "The delay must be between 1 and 10 seconds, inclusively.", http.StatusBadRequest) | |
return | |
} | |
// Set name and validate value. | |
name := r.FormValue("name") | |
if name == "" { | |
http.Error(w, "You must specify a name.", http.StatusBadRequest) | |
return | |
} | |
// Create Job and push the work onto the jobCh. | |
job := job{name, duration} | |
go func() { | |
fmt.Printf("added: %s %s\n", job.name, job.duration) | |
jobs <- job | |
}() | |
// Render success. | |
w.WriteHeader(http.StatusCreated) | |
return | |
} | |
func main() { | |
var ( | |
maxQueueSize = flag.Int("max_queue_size", 100, "The size of job queue") | |
maxWorkers = flag.Int("max_workers", 5, "The number of workers to start") | |
port = flag.String("port", "8080", "The server port") | |
) | |
flag.Parse() | |
// create job channel | |
jobs := make(chan job, *maxQueueSize) | |
// create workers | |
for i := 1; i <= *maxWorkers; i++ { | |
go func(i int) { | |
for j := range jobs { | |
doWork(i, j) | |
} | |
}(i) | |
} | |
// handler for adding jobs | |
http.HandleFunc("/work", func(w http.ResponseWriter, r *http.Request) { | |
requestHandler(jobs, w, r) | |
}) | |
log.Fatal(http.ListenAndServe(":"+*port, nil)) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
_ "expvar" | |
"fmt" | |
"math/rand" | |
"sync" | |
"time" | |
) | |
const maxWorkers = 10 | |
type job struct { | |
name string | |
duration time.Duration | |
} | |
func doWork(id int, j job) { | |
fmt.Printf("worker%d: started %s, working for %fs\n", id, j.name, j.duration.Seconds()) | |
time.Sleep(j.duration) | |
fmt.Printf("worker%d: completed %s!\n", id, j.name) | |
} | |
func main() { | |
// channel for jobs | |
jobs := make(chan job) | |
// start workers | |
wg := &sync.WaitGroup{} | |
wg.Add(maxWorkers) | |
for i := 1; i <= maxWorkers; i++ { | |
go func(i int) { | |
defer wg.Done() | |
for j := range jobs { | |
doWork(i, j) | |
} | |
}(i) | |
} | |
// add jobs | |
for i := 0; i < 100; i++ { | |
name := fmt.Sprintf("job-%d", i) | |
duration := time.Duration(rand.Intn(1000)) * time.Millisecond | |
fmt.Printf("adding: %s %s\n", name, duration) | |
jobs <- job{name, duration} | |
} | |
close(jobs) | |
// wait for workers to complete | |
wg.Wait() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment