Skip to content

Instantly share code, notes, and snippets.


digorithm/withContextCancellation.go Secret

Last active Oct 31, 2020
What would you like to do?
package main
import (
func slowJob1(name string, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("starting job 1 for %s\n", name)
time.Sleep(5 * time.Second)
fmt.Printf("finished job 1 for %s\n", name)
func slowJob2(name string, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("starting job 2 for %s\n", name)
time.Sleep(4 * time.Second)
fmt.Printf("finished job 2 for %s\n", name)
func slowJob3(name string, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("starting job 3 for %s\n", name)
time.Sleep(3 * time.Second)
fmt.Printf("finished job 3 for %s\n", name)
func consumer(ctx context.Context, jobQueue chan string, doneChan chan interface{}) {
wg := &sync.WaitGroup{}
for {
select {
// If the context was cancelled, a SIGTERM was captured
// So we wait for the jobs to finish, write to the done channel and return
case <-ctx.Done():
// Note that the waiting time here is unbounded and can take a long time.
// If that's an issue you can:
// (1) issue a SIGKILL after a certain time or
// (2) use a context with timeout
fmt.Println("writing to done channel")
doneChan <- struct{}{}
log.Println("Done, shutting down the consumer")
case job := <-jobQueue:
go slowJob1(job, wg)
go slowJob2(job, wg)
go slowJob3(job, wg)
// Our custom handler that holds a wait group used to block the shutdown while
// it's running the jobs.
type CustomHandler struct {
jobQueue chan string
func NewCustomHandler(jobQueue chan string) *CustomHandler {
// You can check for wg == nil if feeling paranoid
return &CustomHandler{jobQueue: jobQueue}
func (h *CustomHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
jobName := vars["jobName"]
h.jobQueue <- jobName
fmt.Fprintf(w, "job %s started", jobName)
func main() {
jobQueue := make(chan string)
customHandler := NewCustomHandler(jobQueue)
ctx, cancel := context.WithCancel(context.Background())
router := mux.NewRouter()
router.Handle("/{jobName}", customHandler)
httpServer := &http.Server{
Addr: ":8080",
Handler: router,
// Handle sigterm and await termChan signal
termChan := make(chan os.Signal)
signal.Notify(termChan, syscall.SIGTERM, syscall.SIGINT)
go func() {
if err := httpServer.ListenAndServe(); err != nil {
if err != http.ErrServerClosed {
log.Printf("HTTP server closed with: %v\n", err)
log.Printf("HTTP server shut down")
// doneChan will be the channel we'll be listening on
// to know all already started jobs have finished
// before we actually exit the program
doneChan := make(chan interface{})
go consumer(ctx, jobQueue, doneChan)
// Wait for SIGTERM to be captured
log.Println("SIGTERM received. Shutdown process initiated")
// Shutdown the HTTP server
if err := httpServer.Shutdown(ctx); err != nil {
log.Fatalf("Server Shutdown Failed:%+v", err)
// Cancel the context, this will make the consumer stop
// Wait for the consumer's jobs to finish
log.Println("waiting consumer to finish its jobs")
log.Println("done. returning.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment