Skip to content

Instantly share code, notes, and snippets.

@brontolinux
Last active October 3, 2018 15:19
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save brontolinux/4de3330c93128eb706d5ad2a879bff7c to your computer and use it in GitHub Desktop.
Save brontolinux/4de3330c93128eb706d5ad2a879bff7c to your computer and use it in GitHub Desktop.
Dispatching jobs to workers in Golang
package main
import "fmt" // to print stuff
import "sync" // to sync goroutines with WaitGroup
import "time" // to sleep randomly
import "math/rand" // to create random numbers
import "math" // to compute how much concurrency we can use
import "runtime" // ditto
const q = 17
const tests = 100
const maxSleep = 5
// math.Max takes floats, runtime.NumCPU returns integers, concurrency
// must be an integer... type conversions all over the place, pfffff...
var concurrency = int(math.Max(float64(runtime.NumCPU()-1), 1.0))
func main() {
// create a WaitGroup, so that later you can wait until all
// workers have shut down when they are done
var wg sync.WaitGroup
// flag to signal the program that it must stop dispatching
// numbers to the workers; the dispatching cycle only reads
// this variable, and the reaper is the only thing that
// writes to it, so there should be no need to use mutex and
// locking in general
var stopDispatching bool
// queue to dispatch the data to the workers
dataQ := make(chan int)
// queue where the workers report their findings
resultsQ := make(chan bool)
// worker takes an integer "ID", so that it can identify itself
// in messages. It reads an integer from the dataQ and checks if
// it's a multiple of q. If it is, it will send a true value into
// resultsQ, otherwise it will send a false value. In any case,
// it reports the result. The worker terminates when the dataQ
// is closed.
worker := func(i int) {
fmt.Println("Starting worker", i)
// add yourself into the WaitGroup and defer the call to
// Done() to when the worker terminates
wg.Add(1)
defer wg.Done()
// read from dataQ
for n := range dataQ {
var msg string
var result bool
if n%q == 0 {
msg = ""
result = true
} else {
msg = "n't"
result = false
}
// calculate a random number of seconds to pause
// between 0 and maxSleep, then report
sleepTime := rand.Int63n(maxSleep)
fmt.Printf("Worker %d: %d is%s a multiple of %d [sleeping %d seconds]\n", i, n, msg, q, sleepTime)
// send the result to resultQ, sleep and iterate
resultsQ <- result
time.Sleep(time.Second * time.Duration(sleepTime))
}
// if we get here, dataQ was closed
fmt.Printf("Worker %d shutting down\n", i)
}
// reaper is the goroutine that reads from resultsQ and reports;
// when a true value is received, the flag stopDispatching is
// activated so that the main program knows that it should
// stop sending data through dataQ and close it, so that the
// workers can terminate
reaper := func() {
for result := range resultsQ {
if result {
fmt.Println("Found a multiple of", q)
stopDispatching = true
}
}
// if we get here, resultQ was closed
fmt.Println("Stopping reaper")
}
// start some workers
for i := 1; i <= concurrency; i++ {
go worker(i)
}
// start the reaper
go reaper()
// start sending data to dataQ, either for "tests" times, or
// until the flag stopDispatching is raised; when that happens,
// break out of the cycle and start the shutdown
for i := 1; i <= tests; i++ {
if stopDispatching {
break
}
fmt.Printf("Sending datum %d to dataQ\n", i)
dataQ <- rand.Intn(10000)
}
// shut down the workers by closing the dataQ; there is nothing
// left sending data to dataQ so this won't cause any panic;
fmt.Println("Closing dataQ")
close(dataQ)
// you can't close resultsQ as long as some workers are still
// alive, if you do they will cause a panic when they try to send
// to resultsQ and find it closed; so we sit and wait for all
// workers to terminate
fmt.Println("Waiting for workers to terminate...")
wg.Wait()
// all workers are now dead; we can close resultsQ now, which
// will cause the reaper to shut down finally; that will end
// the program.
fmt.Println("Closing resultsQ")
close(resultsQ)
fmt.Println("Clean shutdown reached")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment