Last active
September 17, 2020 10:27
-
-
Save kazufusa/df519bd64da746b46165104da5fadf3f to your computer and use it in GitHub Desktop.
Go - job queue, worker, and gracefull shutdown
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Go - job queue, worker, and gracefull shutdown | |
package main | |
import ( | |
"context" | |
"errors" | |
"log" | |
"net/http" | |
"os" | |
"os/signal" | |
"sync" | |
"sync/atomic" | |
"syscall" | |
"time" | |
) | |
type IJob interface { | |
Execute() | |
} | |
type Worker struct { | |
wg sync.WaitGroup | |
jobBuffer chan *IJob | |
inShutdown int32 | |
} | |
func NewWorker() *Worker { | |
return &Worker{jobBuffer: make(chan *IJob, 1000), inShutdown: 0} | |
} | |
func (w *Worker) Start() { | |
go w.runner() | |
} | |
func (w *Worker) Add(job IJob) error { | |
if w.shuttingDown() { | |
return errors.New("can't add task in shutting down") | |
} | |
w.jobBuffer <- &job | |
w.wg.Add(1) | |
return nil | |
} | |
func (w *Worker) Shutdown(ctx context.Context) error { | |
defer close(w.jobBuffer) | |
atomic.StoreInt32(&w.inShutdown, 1) | |
done := make(chan struct{}) | |
go func() { | |
w.wg.Wait() | |
close(done) | |
}() | |
select { | |
case <-done: | |
return nil | |
case <-ctx.Done(): | |
return ctx.Err() | |
} | |
} | |
func (w *Worker) runner() { | |
for { | |
select { | |
case job, ok := <-w.jobBuffer: | |
if !ok { | |
return | |
} | |
(*job).Execute() | |
w.wg.Done() | |
} | |
} | |
} | |
func (w *Worker) shuttingDown() bool { | |
return atomic.LoadInt32(&w.inShutdown) != 0 | |
} | |
type Job struct { | |
Name string | |
} | |
func (job *Job) Execute() { | |
log.Printf("%s started\n", job.Name) | |
time.Sleep(2 * time.Second) | |
log.Printf("%s finished\n", job.Name) | |
} | |
var _ IJob = (*Job)(nil) | |
func main() { | |
workers := NewWorker() | |
// start first worker | |
workers.Start() | |
// start second worker | |
workers.Start() | |
server := &http.Server{Addr: ":8080", Handler: http.HandlerFunc( | |
func(w http.ResponseWriter, req *http.Request) { | |
name := req.URL.Path[1:] | |
workers.Add(&Job{Name: name}) | |
w.Write([]byte(name)) | |
}, | |
)} | |
go func() { | |
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { | |
log.Fatalln(err) | |
} | |
}() | |
// for grafefull shutdown | |
var wg sync.WaitGroup | |
var errCount int32 = 0 | |
sig := make(chan os.Signal, 1) | |
signal.Notify(sig, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGINT, os.Interrupt) | |
select { | |
case <-sig: | |
log.Println("handle user interruption") | |
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) | |
defer cancel() | |
wg.Add(2) | |
go func() { | |
defer wg.Done() | |
if err := server.Shutdown(ctx); err != nil { | |
atomic.StoreInt32(&errCount, 1) | |
log.Println("failed to shutdown http server:", err) | |
} | |
}() | |
go func() { | |
defer wg.Done() | |
if err := workers.Shutdown(ctx); err != nil { | |
atomic.StoreInt32(&errCount, 1) | |
log.Println("failed to shutdown job server:", err) | |
} | |
}() | |
} | |
wg.Wait() | |
if errCount == 0 { | |
log.Println("shutdown application properly") | |
} else { | |
log.Println("shutdown application incorrectly") | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ go run main.go | |
2019/12/04 15:11:30 01 started | |
2019/12/04 15:11:30 02 started | |
2019/12/04 15:11:32 01 finished | |
2019/12/04 15:11:32 03 started | |
2019/12/04 15:11:32 02 finished | |
2019/12/04 15:11:32 04 started | |
2019/12/04 15:11:34 03 finished | |
2019/12/04 15:11:34 05 started | |
2019/12/04 15:11:34 04 finished | |
2019/12/04 15:11:34 06 started | |
2019/12/04 15:11:36 05 finished | |
2019/12/04 15:11:36 07 started | |
2019/12/04 15:11:36 06 finished | |
2019/12/04 15:11:36 08 started | |
^C2019/12/04 15:11:37 handle user interruption | |
2019/12/04 15:11:38 07 finished | |
2019/12/04 15:11:38 09 started | |
2019/12/04 15:11:38 08 finished | |
2019/12/04 15:11:38 10 started | |
2019/12/04 15:11:40 09 finished | |
2019/12/04 15:11:40 10 finished | |
2019/12/04 15:11:40 shutdown application properly | |
$ go run main.go | |
2019/12/04 15:11:18 01 started | |
2019/12/04 15:11:19 02 started | |
^C2019/12/04 15:11:19 handle user interruption | |
2019/12/04 15:11:20 01 finished | |
2019/12/04 15:11:20 03 started | |
2019/12/04 15:11:21 02 finished | |
2019/12/04 15:11:21 04 started | |
2019/12/04 15:11:22 03 finished | |
2019/12/04 15:11:22 05 started | |
2019/12/04 15:11:23 04 finished | |
2019/12/04 15:11:23 06 started | |
2019/12/04 15:11:24 failed to shutdown job server: context deadline exceeded | |
2019/12/04 15:11:24 shutdown application incorrectly |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment