Skip to content

Instantly share code, notes, and snippets.

@brontolinux brontolinux/parallel.go Secret
Last active Oct 3, 2018

What would you like to do?
Dispatching jobs to workers in Golang
package main
import "fmt" // to print stuff
import "sync" // to sync goroutines with WaitGroup
import "time" // to sleep randomly
import "math/rand" // to create random numbers
import "math" // to compute how much concurrency we can use
import "runtime" // ditto
const q = 17
const tests = 100
const maxSleep = 5
// math.Max takes floats, runtime.NumCPU returns integers, concurrency
// must be an integer... type conversions all over the place, pfffff...
var concurrency = int(math.Max(float64(runtime.NumCPU()-1), 1.0))
func main() {
// create a WaitGroup, so that later you can wait until all
// workers have shut down when they are done
var wg sync.WaitGroup
// flag to signal the program that it must stop dispatching
// numbers to the workers; the dispatching cycle only reads
// this variable, and the reaper is the only thing that
// writes to it, so there should be no need to use mutex and
// locking in general
var stopDispatching bool
// queue to dispatch the data to the workers
dataQ := make(chan int)
// queue where the workers report their findings
resultsQ := make(chan bool)
// worker takes an integer "ID", so that it can identify itself
// in messages. It reads an integer from the dataQ and checks if
// it's a multiple of q. If it is, it will send a true value into
// resultsQ, otherwise it will send a false value. In any case,
// it reports the result. The worker terminates when the dataQ
// is closed.
worker := func(i int) {
fmt.Println("Starting worker", i)
// add yourself into the WaitGroup and defer the call to
// Done() to when the worker terminates
defer wg.Done()
// read from dataQ
for n := range dataQ {
var msg string
var result bool
if n%q == 0 {
msg = ""
result = true
} else {
msg = "n't"
result = false
// calculate a random number of seconds to pause
// between 0 and maxSleep, then report
sleepTime := rand.Int63n(maxSleep)
fmt.Printf("Worker %d: %d is%s a multiple of %d [sleeping %d seconds]\n", i, n, msg, q, sleepTime)
// send the result to resultQ, sleep and iterate
resultsQ <- result
time.Sleep(time.Second * time.Duration(sleepTime))
// if we get here, dataQ was closed
fmt.Printf("Worker %d shutting down\n", i)
// reaper is the goroutine that reads from resultsQ and reports;
// when a true value is received, the flag stopDispatching is
// activated so that the main program knows that it should
// stop sending data through dataQ and close it, so that the
// workers can terminate
reaper := func() {
for result := range resultsQ {
if result {
fmt.Println("Found a multiple of", q)
stopDispatching = true
// if we get here, resultQ was closed
fmt.Println("Stopping reaper")
// start some workers
for i := 1; i <= concurrency; i++ {
go worker(i)
// start the reaper
go reaper()
// start sending data to dataQ, either for "tests" times, or
// until the flag stopDispatching is raised; when that happens,
// break out of the cycle and start the shutdown
for i := 1; i <= tests; i++ {
if stopDispatching {
fmt.Printf("Sending datum %d to dataQ\n", i)
dataQ <- rand.Intn(10000)
// shut down the workers by closing the dataQ; there is nothing
// left sending data to dataQ so this won't cause any panic;
fmt.Println("Closing dataQ")
// you can't close resultsQ as long as some workers are still
// alive, if you do they will cause a panic when they try to send
// to resultsQ and find it closed; so we sit and wait for all
// workers to terminate
fmt.Println("Waiting for workers to terminate...")
// all workers are now dead; we can close resultsQ now, which
// will cause the reaper to shut down finally; that will end
// the program.
fmt.Println("Closing resultsQ")
fmt.Println("Clean shutdown reached")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.