Skip to content

Instantly share code, notes, and snippets.

@nathancyam
Last active August 31, 2022 12:03
Show Gist options
  • Save nathancyam/2d01efbc3f7a1ce1f0d6e375b118f8ff to your computer and use it in GitHub Desktop.
Save nathancyam/2d01efbc3f7a1ce1f0d6e375b118f8ff to your computer and use it in GitHub Desktop.
Naive worker pool with job drainer behaviour
package main
import (
"fmt"
"os"
"os/signal"
"runtime"
"sync"
"syscall"
"time"
)
type workerManager struct {
jobCh chan string
poolSize int
close chan struct{}
}
func newWorkerManager(poolSize int) *workerManager {
done := make(chan struct{})
jobCh := make(chan string)
return &workerManager{jobCh: jobCh, poolSize: poolSize, close: done}
}
func (w *workerManager) SendJob(j string) {
w.jobCh <- j
}
func (w *workerManager) StartWorkers() func() {
// Build a buffered channel to dispatch to all workers that they should shut down
finishedBufCh := make(chan func(), w.poolSize)
for i := 0; i < w.poolSize; i++ {
go w.bootWorker(i, finishedBufCh)
}
return func() {
// Use a callback that should be called that has proper shutdown
// semantics. It's easier to follow and understand.
var wg sync.WaitGroup
// Go through each the workers and build the message buffer. At the
// end of the pool size, this will be read by all workers, which
// allows the wait group Done to be called. Wait till every worker
// has called the callback
for i := 0; i < w.poolSize; i++ {
wg.Add(1)
finishedBufCh <- wg.Done
}
wg.Wait()
fmt.Println("all workers have exited, can cleanly exit")
}
}
func (w *workerManager) bootWorker(workerID int, onClose <-chan func()) {
defer func() {
if r := recover(); r != nil {
fmt.Println(fmt.Sprintf("panic: %v", r))
}
}()
for {
// Either receive two messages from these channels; one
// to do actual work, the other to close. These messages
// are handled synchronously.
select {
case j := <-w.jobCh:
w.doWork(workerID, j)
case d := <-onClose:
fmt.Println(fmt.Sprintf("worker %d: received signal from parent context to exit", workerID))
d()
fmt.Println(fmt.Sprintf("worker %d exited", workerID))
return
}
}
}
func (w *workerManager) doWork(workerID int, j string) {
fmt.Println(fmt.Sprintf("worker %d: received job: %v", workerID, j))
time.Sleep(5 * time.Second)
fmt.Println(fmt.Sprintf("worker %d: done job: %v", workerID, j))
}
func main() {
defer func() {
fmt.Println("application successfully shutdown")
}()
sigs := make(chan os.Signal)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
mgr := newWorkerManager(runtime.NumCPU())
t := time.NewTicker(200 * time.Millisecond)
done := make(chan struct{})
// Start the worker pool. We get a shutdown function that has graceful
// shutdown semantics.
stopWorkers := mgr.StartWorkers()
defer stopWorkers()
go func() {
for {
select {
case ti := <-t.C:
fmt.Println("looping job ticker")
mgr.SendJob(ti.Format(time.RFC3339))
case <-done:
// When this is reached, defer callback kicks in, stopping
// the user pool
return
}
}
}()
// Listen on OS signals. When we get one, the following steps occur:
sig := <-sigs
fmt.Println(fmt.Sprintf("received signal %v, closing application", sig))
// Stop the timers, or the source of jobs. Since we put it in goroutine
// we also have to send a message to the done channel
fmt.Println("Stopping ticker")
t.Stop()
done <- struct{}{}
fmt.Println("Waiting for worker pools to be closed...")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment