-
-
Save brontolinux/4de3330c93128eb706d5ad2a879bff7c to your computer and use it in GitHub Desktop.
Dispatching jobs to workers in Golang
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import "fmt" // to print stuff | |
import "sync" // to sync goroutines with WaitGroup | |
import "time" // to sleep randomly | |
import "math/rand" // to create random numbers | |
import "math" // to compute how much concurrency we can use | |
import "runtime" // ditto | |
const q = 17 | |
const tests = 100 | |
const maxSleep = 5 | |
// math.Max takes floats, runtime.NumCPU returns integers, concurrency | |
// must be an integer... type conversions all over the place, pfffff... | |
var concurrency = int(math.Max(float64(runtime.NumCPU()-1), 1.0)) | |
func main() { | |
// create a WaitGroup, so that later you can wait until all | |
// workers have shut down when they are done | |
var wg sync.WaitGroup | |
// flag to signal the program that it must stop dispatching | |
// numbers to the workers; the dispatching cycle only reads | |
// this variable, and the reaper is the only thing that | |
// writes to it, so there should be no need to use mutex and | |
// locking in general | |
var stopDispatching bool | |
// queue to dispatch the data to the workers | |
dataQ := make(chan int) | |
// queue where the workers report their findings | |
resultsQ := make(chan bool) | |
// worker takes an integer "ID", so that it can identify itself | |
// in messages. It reads an integer from the dataQ and checks if | |
// it's a multiple of q. If it is, it will send a true value into | |
// resultsQ, otherwise it will send a false value. In any case, | |
// it reports the result. The worker terminates when the dataQ | |
// is closed. | |
worker := func(i int) { | |
fmt.Println("Starting worker", i) | |
// add yourself into the WaitGroup and defer the call to | |
// Done() to when the worker terminates | |
wg.Add(1) | |
defer wg.Done() | |
// read from dataQ | |
for n := range dataQ { | |
var msg string | |
var result bool | |
if n%q == 0 { | |
msg = "" | |
result = true | |
} else { | |
msg = "n't" | |
result = false | |
} | |
// calculate a random number of seconds to pause | |
// between 0 and maxSleep, then report | |
sleepTime := rand.Int63n(maxSleep) | |
fmt.Printf("Worker %d: %d is%s a multiple of %d [sleeping %d seconds]\n", i, n, msg, q, sleepTime) | |
// send the result to resultQ, sleep and iterate | |
resultsQ <- result | |
time.Sleep(time.Second * time.Duration(sleepTime)) | |
} | |
// if we get here, dataQ was closed | |
fmt.Printf("Worker %d shutting down\n", i) | |
} | |
// reaper is the goroutine that reads from resultsQ and reports; | |
// when a true value is received, the flag stopDispatching is | |
// activated so that the main program knows that it should | |
// stop sending data through dataQ and close it, so that the | |
// workers can terminate | |
reaper := func() { | |
for result := range resultsQ { | |
if result { | |
fmt.Println("Found a multiple of", q) | |
stopDispatching = true | |
} | |
} | |
// if we get here, resultQ was closed | |
fmt.Println("Stopping reaper") | |
} | |
// start some workers | |
for i := 1; i <= concurrency; i++ { | |
go worker(i) | |
} | |
// start the reaper | |
go reaper() | |
// start sending data to dataQ, either for "tests" times, or | |
// until the flag stopDispatching is raised; when that happens, | |
// break out of the cycle and start the shutdown | |
for i := 1; i <= tests; i++ { | |
if stopDispatching { | |
break | |
} | |
fmt.Printf("Sending datum %d to dataQ\n", i) | |
dataQ <- rand.Intn(10000) | |
} | |
// shut down the workers by closing the dataQ; there is nothing | |
// left sending data to dataQ so this won't cause any panic; | |
fmt.Println("Closing dataQ") | |
close(dataQ) | |
// you can't close resultsQ as long as some workers are still | |
// alive, if you do they will cause a panic when they try to send | |
// to resultsQ and find it closed; so we sit and wait for all | |
// workers to terminate | |
fmt.Println("Waiting for workers to terminate...") | |
wg.Wait() | |
// all workers are now dead; we can close resultsQ now, which | |
// will cause the reaper to shut down finally; that will end | |
// the program. | |
fmt.Println("Closing resultsQ") | |
close(resultsQ) | |
fmt.Println("Clean shutdown reached") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment