Skip to content

Instantly share code, notes, and snippets.

@rybit
Created May 5, 2017 23:40
Show Gist options
  • Save rybit/5e4819e15bd1cc34a5f0842f02503a1b to your computer and use it in GitHub Desktop.
Save rybit/5e4819e15bd1cc34a5f0842f02503a1b to your computer and use it in GitHub Desktop.
worker pool with timed shutdown
func StartWorkerPoolWithTimedShutdown(batchSize, numWorkers int, endpoint string) (chan<- string, chan time.Duration) {
externalShutdown := make(chan time.Duration)
work := make(chan string)
// use a sync.WaitGroup to ensure that we wait for all of the workers to Finished
wg := new(sync.WaitGroup)
// use a context with a cancel method to signal internally to shutdown the worker
ctx, cancel := context.WithCancel(context.Background())
// ctx, cancel := context.WithTimeout(, timeout)
for i := 0; i < numWorkers; i++ {
w := worker{
id: i,
batchSize: batchSize,
endpoint: endpoint,
incoming: work,
ctx: ctx,
}
wg.Add(1)
go func() {
w.start()
wg.Done()
}()
}
go func() {
// wait for an external signal
duration := <-externalShutdown
// signal the completion
cancel()
// wait for the context to be completed
select {
// DAFUQ????
case <-ctx.Done():
if ctx.Err() != nil {
fmt.Printf("Failed to shutdown all workers in time. I guess I'll die: %s\n", ctx.Err().Error())
}
case <-time.After(duration):
}
// indicate that we're done here
close(externalShutdown)
}()
return work, externalShutdown
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment