Last active
August 31, 2022 12:03
-
-
Save nathancyam/2d01efbc3f7a1ce1f0d6e375b118f8ff to your computer and use it in GitHub Desktop.
Naive worker pool with job drainer behaviour
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"os" | |
"os/signal" | |
"runtime" | |
"sync" | |
"syscall" | |
"time" | |
) | |
type workerManager struct { | |
jobCh chan string | |
poolSize int | |
close chan struct{} | |
} | |
func newWorkerManager(poolSize int) *workerManager { | |
done := make(chan struct{}) | |
jobCh := make(chan string) | |
return &workerManager{jobCh: jobCh, poolSize: poolSize, close: done} | |
} | |
func (w *workerManager) SendJob(j string) { | |
w.jobCh <- j | |
} | |
func (w *workerManager) StartWorkers() func() { | |
// Build a buffered channel to dispatch to all workers that they should shut down | |
finishedBufCh := make(chan func(), w.poolSize) | |
for i := 0; i < w.poolSize; i++ { | |
go w.bootWorker(i, finishedBufCh) | |
} | |
return func() { | |
// Use a callback that should be called that has proper shutdown | |
// semantics. It's easier to follow and understand. | |
var wg sync.WaitGroup | |
// Go through each the workers and build the message buffer. At the | |
// end of the pool size, this will be read by all workers, which | |
// allows the wait group Done to be called. Wait till every worker | |
// has called the callback | |
for i := 0; i < w.poolSize; i++ { | |
wg.Add(1) | |
finishedBufCh <- wg.Done | |
} | |
wg.Wait() | |
fmt.Println("all workers have exited, can cleanly exit") | |
} | |
} | |
func (w *workerManager) bootWorker(workerID int, onClose <-chan func()) { | |
defer func() { | |
if r := recover(); r != nil { | |
fmt.Println(fmt.Sprintf("panic: %v", r)) | |
} | |
}() | |
for { | |
// Either receive two messages from these channels; one | |
// to do actual work, the other to close. These messages | |
// are handled synchronously. | |
select { | |
case j := <-w.jobCh: | |
w.doWork(workerID, j) | |
case d := <-onClose: | |
fmt.Println(fmt.Sprintf("worker %d: received signal from parent context to exit", workerID)) | |
d() | |
fmt.Println(fmt.Sprintf("worker %d exited", workerID)) | |
return | |
} | |
} | |
} | |
func (w *workerManager) doWork(workerID int, j string) { | |
fmt.Println(fmt.Sprintf("worker %d: received job: %v", workerID, j)) | |
time.Sleep(5 * time.Second) | |
fmt.Println(fmt.Sprintf("worker %d: done job: %v", workerID, j)) | |
} | |
func main() { | |
defer func() { | |
fmt.Println("application successfully shutdown") | |
}() | |
sigs := make(chan os.Signal) | |
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) | |
mgr := newWorkerManager(runtime.NumCPU()) | |
t := time.NewTicker(200 * time.Millisecond) | |
done := make(chan struct{}) | |
// Start the worker pool. We get a shutdown function that has graceful | |
// shutdown semantics. | |
stopWorkers := mgr.StartWorkers() | |
defer stopWorkers() | |
go func() { | |
for { | |
select { | |
case ti := <-t.C: | |
fmt.Println("looping job ticker") | |
mgr.SendJob(ti.Format(time.RFC3339)) | |
case <-done: | |
// When this is reached, defer callback kicks in, stopping | |
// the user pool | |
return | |
} | |
} | |
}() | |
// Listen on OS signals. When we get one, the following steps occur: | |
sig := <-sigs | |
fmt.Println(fmt.Sprintf("received signal %v, closing application", sig)) | |
// Stop the timers, or the source of jobs. Since we put it in goroutine | |
// we also have to send a message to the done channel | |
fmt.Println("Stopping ticker") | |
t.Stop() | |
done <- struct{}{} | |
fmt.Println("Waiting for worker pools to be closed...") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment