Skip to content

Instantly share code, notes, and snippets.

@digorithm

digorithm/jobqueue.go Secret

Created Oct 27, 2020
Embed
What would you like to do?
package main
import (
"context"
"fmt"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"
"log"
"github.com/gorilla/mux"
)
func slowJob1(name string, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("starting job 1 for %s\n", name)
time.Sleep(5 * time.Second)
fmt.Printf("finished job 1 for %s\n", name)
}
func slowJob2(name string, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("starting job 2 for %s\n", name)
time.Sleep(4 * time.Second)
fmt.Printf("finished job 2 for %s\n", name)
}
func slowJob3(name string, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("starting job 3 for %s\n", name)
time.Sleep(3 * time.Second)
fmt.Printf("finished job 3 for %s\n", name)
}
func consumer(jobQueue chan string) {
wg := &sync.WaitGroup{}
for job := range jobQueue {
wg.Add(3)
go slowJob1(job, wg)
go slowJob2(job, wg)
go slowJob3(job, wg)
}
log.Println("Waiting for running jobs to finish")
wg.Wait()
log.Println("Done, shutting down the consumer")
}
// Our custom handler that holds a wait group used to block the shutdown while
// it's running the jobs.
type CustomHandler struct {
jobQueue chan string
}
func NewCustomHandler(jobQueue chan string) *CustomHandler {
// You can check for wg == nil if feeling paranoid
return &CustomHandler{jobQueue: jobQueue}
}
func (h *CustomHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
jobName := vars["jobName"]
h.jobQueue <- jobName
fmt.Fprintf(w, "job %s started", jobName)
}
func main() {
jobQueue := make(chan string)
customHandler := NewCustomHandler(jobQueue)
router := mux.NewRouter()
router.Handle("/{jobName}", customHandler)
httpServer := &http.Server{
Addr: ":8080",
Handler: router,
}
// Handle sigterm and await termChan signal
termChan := make(chan os.Signal)
signal.Notify(termChan, syscall.SIGTERM, syscall.SIGINT)
go func() {
<-termChan // Blocks here until interrupted
log.Println("SIGTERM received. Shutdown process initiated")
log.Println("stopping the consumer")
close(jobQueue) // This will force the consumer to stop its main loop
httpServer.Shutdown(context.Background())
}()
go func() {
if err := httpServer.ListenAndServe(); err != nil {
if err.Error() != "http: Server closed" {
log.Printf("HTTP server closed with: %v\n", err)
}
log.Printf("HTTP server shut down")
}
}()
// Now the consumer is the blocking part
consumer(jobQueue)
}
package main
import (
"context"
"fmt"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"
"log"
"github.com/gorilla/mux"
)
func slowJob1(name string, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("starting job 1 for %s\n", name)
time.Sleep(5 * time.Second)
fmt.Printf("finished job 1 for %s\n", name)
}
func slowJob2(name string, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("starting job 2 for %s\n", name)
time.Sleep(4 * time.Second)
fmt.Printf("finished job 2 for %s\n", name)
}
func slowJob3(name string, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("starting job 3 for %s\n", name)
time.Sleep(3 * time.Second)
fmt.Printf("finished job 3 for %s\n", name)
}
// Our custom handler that holds a wait group used to block the shutdown while
// it's running the jobs.
type CustomHandler struct {
wg *sync.WaitGroup
}
func NewCustomHandler(wg *sync.WaitGroup) *CustomHandler {
// You can check for wg == nil if feeling paranoid
return &CustomHandler{wg: wg}
}
func (h *CustomHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
jobName := vars["jobName"]
fmt.Fprintf(w, "job %s started", jobName)
h.wg.Add(3)
go slowJob1(jobName, h.wg)
go slowJob2(jobName, h.wg)
go slowJob3(jobName, h.wg)
}
func main() {
wg := &sync.WaitGroup{}
customHandler := NewCustomHandler(wg)
router := mux.NewRouter()
router.Handle("/{jobName}", customHandler)
httpServer := &http.Server{
Addr: ":8080",
Handler: router,
}
// Handle sigterm and await termChan signal
termChan := make(chan os.Signal)
signal.Notify(termChan, syscall.SIGTERM, syscall.SIGINT)
go func() {
<-termChan // Blocks here until interrupted
log.Print("SIGTERM received. Shutdown process initiated\n")
httpServer.Shutdown(context.Background())
}()
// Blocking
if err := httpServer.ListenAndServe(); err != nil {
if err.Error() != "http: Server closed" {
log.Printf("HTTP server closed with: %v\n", err)
}
log.Printf("HTTP server shut down")
}
// This is where, once we're closing the program, we wait for all
// jobs (they all have been added to this WaitGroup) to `wg.Done()`.
log.Println("waiting for running jobs to finish")
wg.Wait()
log.Println("jobs finished. exiting")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment