Skip to content

Instantly share code, notes, and snippets.

@yuroyoro
Last active August 29, 2015 13:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yuroyoro/e107c0c946311e3c7a41 to your computer and use it in GitHub Desktop.
Save yuroyoro/e107c0c946311e3c7a41 to your computer and use it in GitHub Desktop.
[Writing worker queues, in Go](http://nesv.github.io/golang/2014/02/25/worker-queues-in-go.html) の 写経
package main
import (
"fmt"
"net/http"
"time"
)
// A buffered channel that we can send work requests on
var WorkQueue = make(chan WorkRequest, 100)
func Collector(w http.ResponseWriter, r *http.Request) {
// Make sure we can only be called with an HTTP POST request.
if r.Method != "POST" {
w.Header().Set("Allow", "POST")
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// Parse the Delay
delay, err := time.ParseDuration(r.FormValue("delay"))
if err != nil {
http.Error(w, "Bad delay value: "+err.Error(), http.StatusBadRequest)
return
}
// Check to make sure the delay is anaywhere from 1 to 10 seconds.
if delay.Seconds() < 1 || delay.Seconds() > 10 {
http.Error(w, "The delay must be beteween 1 and 10 seconds, inclusively.", http.StatusBadRequest)
return
}
// Now, we retrieve the person's name from the request
name := r.FormValue("name")
// Just do a quick bit of sanity checking to make sure the client actuallyprovided us with a name.
if name == "" {
http.Error(w, "You msut specify a name", http.StatusBadRequest)
return
}
// Now, we take the delay, and the persion's name, and make a WorkRequest out of them
work := WorkRequest{Name: name, Delay: delay}
// Push the work onto the queue.
WorkQueue <- work
fmt.Println("Work request queued")
// And let the user know their work request was created.
w.WriteHeader(http.StatusCreated)
return
}
package main
import "fmt"
var WorkerQueue chan Worker
func StartDispatcher(nworkers int) {
// First, initialize the channel we are going to but the worker's work channels into.
WorkerQueue = make(chan Worker, nworkers)
// Now, create all of our workers
for i := 0; i < nworkers; i++ {
fmt.Println("Starting worker", i+1)
worker := NewWokrer(i+1, WorkerQueue)
worker.Start()
}
go func() {
for {
select {
case work := <-WorkQueue:
fmt.Println("Received work request")
go func() {
worker := <-WorkerQueue
fmt.Printf("Dispatching work request to worker%d\n", worker.ID)
worker.Work <- work
}()
}
}
}()
}
package main
import (
"flag"
"fmt"
"net/http"
)
var (
NWorkers = flag.Int("n", 4, "The number of workers to start")
HTTPAddr = flag.String("http", "127.0.0.1:8000", "Address to listen for HTTP requests on")
)
func main() {
// Parse the command-line flags.
flag.Parse()
// Start the dispatcher.
fmt.Println("Starting the dispatcher")
StartDispatcher(*NWorkers)
// Register our collector as an HTTP handler fucntion.
fmt.Println("Registering the collector")
http.HandleFunc("/work", Collector)
// Start the HTTP server!
fmt.Println("HTTP server listening on ", *HTTPAddr)
if err := http.ListenAndServe(*HTTPAddr, nil); err != nil {
fmt.Println(err.Error())
}
}
package main
import "time"
type WorkRequest struct {
Name string
Delay time.Duration
}
package main
import (
"fmt"
"time"
)
// NewWorker creates, and returns a new Worker object. Its only argument
// is a channel that the worker can add itself to whenerver it is done its
// work.
func NewWokrer(id int, workerQueue chan Worker) Worker {
// Create, and return the worker
worker := Worker{
ID: id,
Work: make(chan WorkRequest),
WorkerQueue: workerQueue,
QuitChan: make(chan bool)}
return worker
}
type Worker struct {
ID int
Work chan WorkRequest
WorkerQueue chan Worker
QuitChan chan bool
}
// This function "starts" the worker by starting a goroutine, that is
// an infinite "for-select" loop.
func (w Worker) Start() {
go func() {
for {
// Add ourselves into the worker queue.
w.WorkerQueue <- w
select {
case work := <-w.Work:
// Recieve a work request
fmt.Printf("workder%d: Recieved work request, delaying for %f seconds\n", w.ID, work.Delay.Seconds())
time.Sleep(work.Delay)
fmt.Printf("worker%d: Hello, %s!\n", w.ID, work.Name)
case <-w.QuitChan:
// We have been asked to stop.
fmt.Printf("worker%d stopping\n", w.ID)
return
}
}
}()
}
// Stop tells the worker to stop listening for work requests.
//
// Note that the worker will only stop *after* it has finished its work.
func (w Worker) Stop() {
go func() {
w.QuitChan <- true
}()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment