Skip to content

Instantly share code, notes, and snippets.

@digorithm

digorithm/withContextCancellation.go Secret

Last active Oct 31, 2020
Embed
What would you like to do?
package main
import (
"context"
"fmt"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"
"log"
"github.com/gorilla/mux"
)
func slowJob1(name string, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("starting job 1 for %s\n", name)
time.Sleep(5 * time.Second)
fmt.Printf("finished job 1 for %s\n", name)
}
func slowJob2(name string, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("starting job 2 for %s\n", name)
time.Sleep(4 * time.Second)
fmt.Printf("finished job 2 for %s\n", name)
}
func slowJob3(name string, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("starting job 3 for %s\n", name)
time.Sleep(3 * time.Second)
fmt.Printf("finished job 3 for %s\n", name)
}
func consumer(ctx context.Context, jobQueue chan string, doneChan chan interface{}) {
wg := &sync.WaitGroup{}
for {
select {
// If the context was cancelled, a SIGTERM was captured
// So we wait for the jobs to finish, write to the done channel and return
case <-ctx.Done():
// Note that the waiting time here is unbounded and can take a long time.
// If that's an issue you can:
// (1) issue a SIGKILL after a certain time or
// (2) use a context with timeout
wg.Wait()
fmt.Println("writing to done channel")
doneChan <- struct{}{}
log.Println("Done, shutting down the consumer")
return
case job := <-jobQueue:
wg.Add(3)
go slowJob1(job, wg)
go slowJob2(job, wg)
go slowJob3(job, wg)
}
}
}
// Our custom handler that holds a wait group used to block the shutdown while
// it's running the jobs.
type CustomHandler struct {
jobQueue chan string
}
func NewCustomHandler(jobQueue chan string) *CustomHandler {
// You can check for wg == nil if feeling paranoid
return &CustomHandler{jobQueue: jobQueue}
}
func (h *CustomHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
jobName := vars["jobName"]
h.jobQueue <- jobName
fmt.Fprintf(w, "job %s started", jobName)
}
func main() {
jobQueue := make(chan string)
customHandler := NewCustomHandler(jobQueue)
ctx, cancel := context.WithCancel(context.Background())
router := mux.NewRouter()
router.Handle("/{jobName}", customHandler)
httpServer := &http.Server{
Addr: ":8080",
Handler: router,
}
// Handle sigterm and await termChan signal
termChan := make(chan os.Signal)
signal.Notify(termChan, syscall.SIGTERM, syscall.SIGINT)
go func() {
if err := httpServer.ListenAndServe(); err != nil {
if err != http.ErrServerClosed {
log.Printf("HTTP server closed with: %v\n", err)
}
log.Printf("HTTP server shut down")
}
}()
// doneChan will be the channel we'll be listening on
// to know all already started jobs have finished
// before we actually exit the program
doneChan := make(chan interface{})
go consumer(ctx, jobQueue, doneChan)
// Wait for SIGTERM to be captured
<-termChan
log.Println("SIGTERM received. Shutdown process initiated")
// Shutdown the HTTP server
if err := httpServer.Shutdown(ctx); err != nil {
log.Fatalf("Server Shutdown Failed:%+v", err)
}
// Cancel the context, this will make the consumer stop
cancel()
// Wait for the consumer's jobs to finish
log.Println("waiting consumer to finish its jobs")
<-doneChan
log.Println("done. returning.")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment