Skip to content

Instantly share code, notes, and snippets.

@aarjan
Last active August 22, 2018 11:11
Show Gist options
  • Save aarjan/babbf9b5a93cb216de36930eb9393ecd to your computer and use it in GitHub Desktop.
Save aarjan/babbf9b5a93cb216de36930eb9393ecd to your computer and use it in GitHub Desktop.
A concurrent worker thread pool model to handle POST request
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"time"
)
// Job represents a single instance of job to be done
type Job struct {
payload Payload
}
// Payload is an information to be transferred
type Payload struct {
Browser string `json:"browser"`
BrowserVersion int64 `json:"browser_version"`
City string `json:"city"`
CurrentURL string `json:"current_url"`
DistinctID string `json:"distinct_id"`
InitialReferrer string `json:"initial_referrer"`
InitialReferringDomain string `json:"initial_referring_domain"`
LibVersion string `json:"lib_version"`
MpCountryCode string `json:"mp_country_code"`
MpLib string `json:"mp_lib"`
Name string `json:"name"`
Os string `json:"os"`
Platform string `json:"platform"`
ScreenHeight int64 `json:"screen_height"`
ScreenWidth int64 `json:"screen_width"`
Timestamp int64 `json:"timestamp"`
}
// Worker represents a worker who carries out the task of handling the job
// Each work could consist a numerous amount of jobs
type Worker struct {
// workerPool is a pool of workers each consisting of a Job channel
workerPool chan chan Job
// jobQueue is for sending/recieving jobs
jobQueue chan Job
// quit is a channel for signalling end of the work
quit chan bool
}
// NewWorker instantiates a Worker with the given pool
func NewWorker(pool chan chan Job) *Worker {
return &Worker{
workerPool: pool,
jobQueue: make(chan Job),
quit: make(chan bool),
}
}
// Start starts the worker
// Registers the job queue to the worker pool
// Works on the job as the job is recieved in the registered job queue
func (w *Worker) Start() {
// It is a good practise to wrap the for loop in a goroutine,
// such that the loop would run concurrently
go func() {
for {
// registers the jobqueue to the worker pool
w.workerPool <- w.jobQueue
select {
// for each Job in the pool, do the work
case Job := <-w.jobQueue:
time.Sleep(10 * time.Millisecond)
fmt.Println(Job.payload)
// In receiving the quit signal, end the work
case <-w.quit:
return
}
}
}()
}
// Dispatcher dispatches the job to the different workers
type Dispatcher struct {
jobQueue chan Job
workerPool chan chan Job
}
// NewDispatcher initializes a new dispatcher with the maximum worker pool
func NewDispatcher(maxWorkers int, jobQueue chan Job) *Dispatcher {
workerPool := make(chan chan Job, maxWorkers)
return &Dispatcher{jobQueue, workerPool}
}
// Run starts each and every worker of the worker pool
func (d *Dispatcher) Run() {
for i := 0; i < cap(d.workerPool); i++ {
worker := NewWorker(d.workerPool)
worker.Start()
}
}
// Dispatch dispatches the job to the worker for each new request
func (d *Dispatcher) Dispatch() {
for {
select {
case job := <-d.jobQueue:
// For each new job request, get a jobQueujobQueue from the workerPool
// Dispatch the job to that jobQueujobQueue
go func(job Job) {
jobQueujobQueue := <-d.workerPool
jobQueujobQueue <- job
}(job)
}
}
}
func handleRequest(jobQueue chan Job) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/event" {
http.NotFound(w, r)
return
}
p := Payload{}
json.NewDecoder(r.Body).Decode(&p)
r.Body.Close()
work := Job{p}
jobQueue <- work
w.WriteHeader(http.StatusOK)
})
}
func main() {
// jobQueue is a channel for sending/recieving jobs
var jobQueue = make(chan Job, 1000)
dispatcher := NewDispatcher(1000, jobQueue)
dispatcher.Run()
go dispatcher.Dispatch()
log.Fatal(http.ListenAndServe(":8080", handleRequest(jobQueue)))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment