Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
A resilient Go worker
package main
// This is an example of a resilient worker program written in Go.
//
// This program will run a worker, wait 5 seconds, and run it again.
// It exits when SIGINT or SIGTERM is received, while ensuring any ongoing work
// is finished before exiting.
//
// Unexpected panics are also handled: program won't crash if the worker panics.
// However, panics in goroutines started by the worker won't be handled and have
// to be dealt with manually.
import (
"fmt"
"log"
"os"
"os/signal"
"runtime/debug"
"sync"
"syscall"
"time"
)
var (
sigChan chan os.Signal // Used for shutdown
workerChan chan bool // Used for worker return value (can be of any type)
triggerChan chan bool // Used to trigger a new worker run
waitGroup sync.WaitGroup
errLogger *log.Logger
)
const sleepDuration time.Duration = time.Duration(5) * time.Second
func runWorker() {
defer recoverWorker()
// Fake long process
time.Sleep(sleepDuration)
waitGroup.Done()
workerChan <- true
}
func recoverWorker() {
if err := recover(); err != nil {
// Handle unexpected panic
errLogger.Println(err)
errLogger.Print(string(debug.Stack()))
// Finish worker execution anyway
waitGroup.Done()
// Return false if service should stop on panic
// Service will continue otherwise
workerChan <- true
}
}
func runTimer() {
for {
triggerChan <- true
if !<-workerChan { // Exit if worker returned false
sigChan <- syscall.SIGTERM
return
}
time.Sleep(sleepDuration)
}
}
func listen() {
for {
select {
case <-sigChan:
// Wait for worker to finish before exit
waitGroup.Wait()
return
case <-triggerChan:
waitGroup.Add(1)
go runWorker()
}
}
}
func setup() {
sigChan = make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
workerChan = make(chan bool)
triggerChan = make(chan bool)
errLogger = log.New(os.Stderr, "", log.LstdFlags)
}
func main() {
setup()
go runTimer()
fmt.Println("Service running...")
listen()
fmt.Println("Service stopped.")
}
@qaisjp
Copy link

qaisjp commented Feb 16, 2020

Why line 58 and 63?

@System-Glitch
Copy link
Author

System-Glitch commented Feb 16, 2020

Why line 58 and 63?

The timer and the signal listener aren't on the same goroutine. Line 58 (now 57) triggers line 74 (now 73) and runs the worker.

This example is a service executing a worker every 5 seconds. So when the worker is done, wait 5 seconds and do it again.

@qaisjp
Copy link

qaisjp commented Feb 17, 2020

If <-workerChan is in two places (and two goroutines), but only L58 checks the value, L73 does not.

It probably works fine here because there's only one worker spawned at a time, but surely this breaks once you get multiple workers?

Also, to confirm, L57 and L58 won't immediately execute because the channel is unbuffered, so L57 blocks until another goroutine reads from the channel. (Guaranteeing that L58 won't just read the true passed in from L57.) Right?

@System-Glitch
Copy link
Author

System-Glitch commented Feb 17, 2020

I fixed this potential issue by using a third channel.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment