Last active
August 22, 2018 11:11
-
-
Save aarjan/babbf9b5a93cb216de36930eb9393ecd to your computer and use it in GitHub Desktop.
A concurrent worker thread pool model to handle POST request
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"encoding/json" | |
"fmt" | |
"log" | |
"net/http" | |
"time" | |
) | |
// Job represents a single instance of job to be done | |
type Job struct { | |
payload Payload | |
} | |
// Payload is an information to be transferred | |
type Payload struct { | |
Browser string `json:"browser"` | |
BrowserVersion int64 `json:"browser_version"` | |
City string `json:"city"` | |
CurrentURL string `json:"current_url"` | |
DistinctID string `json:"distinct_id"` | |
InitialReferrer string `json:"initial_referrer"` | |
InitialReferringDomain string `json:"initial_referring_domain"` | |
LibVersion string `json:"lib_version"` | |
MpCountryCode string `json:"mp_country_code"` | |
MpLib string `json:"mp_lib"` | |
Name string `json:"name"` | |
Os string `json:"os"` | |
Platform string `json:"platform"` | |
ScreenHeight int64 `json:"screen_height"` | |
ScreenWidth int64 `json:"screen_width"` | |
Timestamp int64 `json:"timestamp"` | |
} | |
// Worker represents a worker who carries out the task of handling the job | |
// Each work could consist a numerous amount of jobs | |
type Worker struct { | |
// workerPool is a pool of workers each consisting of a Job channel | |
workerPool chan chan Job | |
// jobQueue is for sending/recieving jobs | |
jobQueue chan Job | |
// quit is a channel for signalling end of the work | |
quit chan bool | |
} | |
// NewWorker instantiates a Worker with the given pool | |
func NewWorker(pool chan chan Job) *Worker { | |
return &Worker{ | |
workerPool: pool, | |
jobQueue: make(chan Job), | |
quit: make(chan bool), | |
} | |
} | |
// Start starts the worker | |
// Registers the job queue to the worker pool | |
// Works on the job as the job is recieved in the registered job queue | |
func (w *Worker) Start() { | |
// It is a good practise to wrap the for loop in a goroutine, | |
// such that the loop would run concurrently | |
go func() { | |
for { | |
// registers the jobqueue to the worker pool | |
w.workerPool <- w.jobQueue | |
select { | |
// for each Job in the pool, do the work | |
case Job := <-w.jobQueue: | |
time.Sleep(10 * time.Millisecond) | |
fmt.Println(Job.payload) | |
// In receiving the quit signal, end the work | |
case <-w.quit: | |
return | |
} | |
} | |
}() | |
} | |
// Dispatcher dispatches the job to the different workers | |
type Dispatcher struct { | |
jobQueue chan Job | |
workerPool chan chan Job | |
} | |
// NewDispatcher initializes a new dispatcher with the maximum worker pool | |
func NewDispatcher(maxWorkers int, jobQueue chan Job) *Dispatcher { | |
workerPool := make(chan chan Job, maxWorkers) | |
return &Dispatcher{jobQueue, workerPool} | |
} | |
// Run starts each and every worker of the worker pool | |
func (d *Dispatcher) Run() { | |
for i := 0; i < cap(d.workerPool); i++ { | |
worker := NewWorker(d.workerPool) | |
worker.Start() | |
} | |
} | |
// Dispatch dispatches the job to the worker for each new request | |
func (d *Dispatcher) Dispatch() { | |
for { | |
select { | |
case job := <-d.jobQueue: | |
// For each new job request, get a jobQueujobQueue from the workerPool | |
// Dispatch the job to that jobQueujobQueue | |
go func(job Job) { | |
jobQueujobQueue := <-d.workerPool | |
jobQueujobQueue <- job | |
}(job) | |
} | |
} | |
} | |
func handleRequest(jobQueue chan Job) http.Handler { | |
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | |
if r.URL.Path != "/event" { | |
http.NotFound(w, r) | |
return | |
} | |
p := Payload{} | |
json.NewDecoder(r.Body).Decode(&p) | |
r.Body.Close() | |
work := Job{p} | |
jobQueue <- work | |
w.WriteHeader(http.StatusOK) | |
}) | |
} | |
func main() { | |
// jobQueue is a channel for sending/recieving jobs | |
var jobQueue = make(chan Job, 1000) | |
dispatcher := NewDispatcher(1000, jobQueue) | |
dispatcher.Run() | |
go dispatcher.Dispatch() | |
log.Fatal(http.ListenAndServe(":8080", handleRequest(jobQueue))) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment