Skip to content

Instantly share code, notes, and snippets.

@kazufusa
Last active September 17, 2020 10:27
Show Gist options
  • Save kazufusa/df519bd64da746b46165104da5fadf3f to your computer and use it in GitHub Desktop.
Save kazufusa/df519bd64da746b46165104da5fadf3f to your computer and use it in GitHub Desktop.
Go - job queue, worker, and gracefull shutdown
// Go - job queue, worker, and gracefull shutdown
package main
import (
"context"
"errors"
"log"
"net/http"
"os"
"os/signal"
"sync"
"sync/atomic"
"syscall"
"time"
)
type IJob interface {
Execute()
}
type Worker struct {
wg sync.WaitGroup
jobBuffer chan *IJob
inShutdown int32
}
func NewWorker() *Worker {
return &Worker{jobBuffer: make(chan *IJob, 1000), inShutdown: 0}
}
func (w *Worker) Start() {
go w.runner()
}
func (w *Worker) Add(job IJob) error {
if w.shuttingDown() {
return errors.New("can't add task in shutting down")
}
w.jobBuffer <- &job
w.wg.Add(1)
return nil
}
func (w *Worker) Shutdown(ctx context.Context) error {
defer close(w.jobBuffer)
atomic.StoreInt32(&w.inShutdown, 1)
done := make(chan struct{})
go func() {
w.wg.Wait()
close(done)
}()
select {
case <-done:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (w *Worker) runner() {
for {
select {
case job, ok := <-w.jobBuffer:
if !ok {
return
}
(*job).Execute()
w.wg.Done()
}
}
}
func (w *Worker) shuttingDown() bool {
return atomic.LoadInt32(&w.inShutdown) != 0
}
type Job struct {
Name string
}
func (job *Job) Execute() {
log.Printf("%s started\n", job.Name)
time.Sleep(2 * time.Second)
log.Printf("%s finished\n", job.Name)
}
var _ IJob = (*Job)(nil)
func main() {
workers := NewWorker()
// start first worker
workers.Start()
// start second worker
workers.Start()
server := &http.Server{Addr: ":8080", Handler: http.HandlerFunc(
func(w http.ResponseWriter, req *http.Request) {
name := req.URL.Path[1:]
workers.Add(&Job{Name: name})
w.Write([]byte(name))
},
)}
go func() {
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalln(err)
}
}()
// for grafefull shutdown
var wg sync.WaitGroup
var errCount int32 = 0
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGINT, os.Interrupt)
select {
case <-sig:
log.Println("handle user interruption")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
wg.Add(2)
go func() {
defer wg.Done()
if err := server.Shutdown(ctx); err != nil {
atomic.StoreInt32(&errCount, 1)
log.Println("failed to shutdown http server:", err)
}
}()
go func() {
defer wg.Done()
if err := workers.Shutdown(ctx); err != nil {
atomic.StoreInt32(&errCount, 1)
log.Println("failed to shutdown job server:", err)
}
}()
}
wg.Wait()
if errCount == 0 {
log.Println("shutdown application properly")
} else {
log.Println("shutdown application incorrectly")
}
}
$ go run main.go
2019/12/04 15:11:30 01 started
2019/12/04 15:11:30 02 started
2019/12/04 15:11:32 01 finished
2019/12/04 15:11:32 03 started
2019/12/04 15:11:32 02 finished
2019/12/04 15:11:32 04 started
2019/12/04 15:11:34 03 finished
2019/12/04 15:11:34 05 started
2019/12/04 15:11:34 04 finished
2019/12/04 15:11:34 06 started
2019/12/04 15:11:36 05 finished
2019/12/04 15:11:36 07 started
2019/12/04 15:11:36 06 finished
2019/12/04 15:11:36 08 started
^C2019/12/04 15:11:37 handle user interruption
2019/12/04 15:11:38 07 finished
2019/12/04 15:11:38 09 started
2019/12/04 15:11:38 08 finished
2019/12/04 15:11:38 10 started
2019/12/04 15:11:40 09 finished
2019/12/04 15:11:40 10 finished
2019/12/04 15:11:40 shutdown application properly
$ go run main.go
2019/12/04 15:11:18 01 started
2019/12/04 15:11:19 02 started
^C2019/12/04 15:11:19 handle user interruption
2019/12/04 15:11:20 01 finished
2019/12/04 15:11:20 03 started
2019/12/04 15:11:21 02 finished
2019/12/04 15:11:21 04 started
2019/12/04 15:11:22 03 finished
2019/12/04 15:11:22 05 started
2019/12/04 15:11:23 04 finished
2019/12/04 15:11:23 06 started
2019/12/04 15:11:24 failed to shutdown job server: context deadline exceeded
2019/12/04 15:11:24 shutdown application incorrectly
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment