|
// Original code with Dispatcher |
|
package main |
|
|
|
import ( |
|
_ "expvar" |
|
"flag" |
|
"fmt" |
|
"net/http" |
|
_ "net/http/pprof" |
|
"os" |
|
"time" |
|
) |
|
|
|
// Job holds the attributes needed to perform unit of work. |
|
type Job struct { |
|
Name string |
|
Delay time.Duration |
|
} |
|
|
|
// NewWorker creates takes a numeric id and a channel w/ worker pool. |
|
func NewWorker(id int, workerPool chan chan Job) Worker { |
|
return Worker{ |
|
id: id, |
|
jobQueue: make(chan Job), |
|
workerPool: workerPool, |
|
quitChan: make(chan bool), |
|
} |
|
} |
|
|
|
type Worker struct { |
|
id int |
|
jobQueue chan Job |
|
workerPool chan chan Job |
|
quitChan chan bool |
|
} |
|
|
|
func (w Worker) start() { |
|
go func() { |
|
for { |
|
// Add my jobQueue to the worker pool. |
|
w.workerPool <- w.jobQueue |
|
|
|
select { |
|
case job := <-w.jobQueue: |
|
// Dispatcher has added a job to my jobQueue. |
|
fmt.Printf("worker%d: started %s, blocking for %f seconds\n", w.id, job.Name, job.Delay.Seconds()) |
|
time.Sleep(job.Delay) |
|
fmt.Printf("worker%d: completed %s!\n", w.id, job.Name) |
|
case <-w.quitChan: |
|
// We have been asked to stop. |
|
fmt.Printf("worker%d stopping\n", w.id) |
|
return |
|
} |
|
} |
|
}() |
|
} |
|
|
|
func (w Worker) stop() { |
|
go func() { |
|
w.quitChan <- true |
|
}() |
|
} |
|
|
|
// NewDispatcher creates, and returns a new Dispatcher object. |
|
func NewDispatcher(jobQueue chan Job, maxWorkers int) *Dispatcher { |
|
workerPool := make(chan chan Job, maxWorkers) |
|
|
|
return &Dispatcher{ |
|
jobQueue: jobQueue, |
|
maxWorkers: maxWorkers, |
|
workerPool: workerPool, |
|
} |
|
} |
|
|
|
type Dispatcher struct { |
|
workerPool chan chan Job |
|
maxWorkers int |
|
jobQueue chan Job |
|
} |
|
|
|
func (d *Dispatcher) run() { |
|
for i := 0; i < d.maxWorkers; i++ { |
|
worker := NewWorker(i+1, d.workerPool) |
|
worker.start() |
|
} |
|
|
|
go d.dispatch() |
|
} |
|
|
|
func (d *Dispatcher) dispatch() { |
|
for { |
|
select { |
|
case job := <-d.jobQueue: |
|
go func() { |
|
fmt.Printf("fetching workerJobQueue for: %s\n", job.Name) |
|
workerJobQueue := <-d.workerPool |
|
fmt.Printf("adding %s to workerJobQueue\n", job.Name) |
|
workerJobQueue <- job |
|
}() |
|
} |
|
} |
|
} |
|
|
|
func requestHandler(w http.ResponseWriter, r *http.Request, jobQueue chan Job) { |
|
// Make sure we can only be called with an HTTP POST request. |
|
if r.Method != "POST" { |
|
w.Header().Set("Allow", "POST") |
|
w.WriteHeader(http.StatusMethodNotAllowed) |
|
return |
|
} |
|
|
|
// Parse the delay. |
|
delay, err := time.ParseDuration(r.FormValue("delay")) |
|
if err != nil { |
|
http.Error(w, "Bad delay value: "+err.Error(), http.StatusBadRequest) |
|
return |
|
} |
|
|
|
// Validate delay is in range 1 to 10 seconds. |
|
if delay.Seconds() < 1 || delay.Seconds() > 10 { |
|
http.Error(w, "The delay must be between 1 and 10 seconds, inclusively.", http.StatusBadRequest) |
|
return |
|
} |
|
|
|
// Set name and validate value. |
|
name := r.FormValue("name") |
|
if name == "" { |
|
http.Error(w, "You must specify a name.", http.StatusBadRequest) |
|
return |
|
} |
|
|
|
// Create Job and push the work onto the jobQueue. |
|
job := Job{Name: name, Delay: delay} |
|
jobQueue <- job |
|
|
|
// Render success. |
|
w.WriteHeader(http.StatusCreated) |
|
} |
|
|
|
func main() { |
|
var ( |
|
maxWorkers = flag.Int("max_workers", 5, "The number of workers to start") |
|
maxQueueSize = flag.Int("max_queue_size", 100, "The size of job queue") |
|
port = flag.String("port", "8080", "The server port") |
|
) |
|
flag.Parse() |
|
|
|
// Create the job queue. |
|
jobQueue := make(chan Job, *maxQueueSize) |
|
|
|
// Start the dispatcher. |
|
dispatcher := NewDispatcher(jobQueue, *maxWorkers) |
|
dispatcher.run() |
|
|
|
// Start the HTTP handler. |
|
http.HandleFunc("/work", func(w http.ResponseWriter, r *http.Request) { |
|
requestHandler(w, r, jobQueue) |
|
}) |
|
log.Fatal(http.ListenAndServe(":"+*port, nil)) |
|
} |