Skip to content

Instantly share code, notes, and snippets.

@rsms
Created October 3, 2020 00:18
Show Gist options
  • Star 9 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rsms/b70b4c7fe3b25e17b4b1f6af8b007c14 to your computer and use it in GitHub Desktop.
Save rsms/b70b4c7fe3b25e17b4b1f6af8b007c14 to your computer and use it in GitHub Desktop.
Example go http server with systemd socket activation and zero-downtime restart
[Unit]
Description = Foo HTTP server
Requires = foo.socket
After = multi-user.target
[Service]
User = www-data
Group = www-data
WorkingDirectory = /var/foo
ExecStart = /var/foo/bin/foo-server
ExecReload = /bin/kill -HUP $MAINPID
Restart = always
NotifyAccess = main
KillMode = process
NonBlocking = true
[Unit]
Description=Foo HTTP server socket
[Socket]
ListenStream = 80
BindIPv6Only = both
package main
import (
"context"
"fmt"
"net/http"
"os"
"time"
"github.com/coreos/go-systemd/activation"
"rsms/systemd"
)
func main() {
httpRoutes := &http.ServeMux{}
httpServer := &http.Server{ Handler: httpRoutes, ...}
// http endpoint that slowly sends chunks back over the course of 10 seconds
httpRoutes.HandleFunc("/slow", func(w http.ResponseWriter, req *http.Request) {
flusher, _ := w.(http.Flusher)
w.Header().Set("Transfer-Encoding", "chunked")
for i := 0; ; {
fmt.Fprintf(w, "Chunk %d from server process %d\n", i, os.Getpid())
flusher.Flush() // Trigger "chunked" encoding and send a chunk...
i++
if i == 10 {
break
}
time.Sleep(1000 * time.Millisecond)
}
w.Header().Set("Content-Length", "0")
})
done := systemd.EnableGracefulShutdown(func(asyncContinue func()) {
timeout := time.Second
if asyncContinue != nil {
timeout = 30 * time.Second
httpServer.Server.RegisterOnShutdown(asyncContinue)
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
httpServer.Server.SetKeepAlivesEnabled(false)
if err := httpServer.Server.Shutdown(ctx); err != nil {
fmt.Printf("graceful shutdown error: %s\n", err)
} else {
fmt.Printf("graceful shutdown complete\n")
}
})
if err := listenAndServe(); err != nil {
panic(err)
}
// wait for shutdown
<-done
}
func listenAndServe(s *http.Server) error {
// get listeners from systemd
listeners, err := activation.Listeners()
if err != nil {
// Note: current implementation of go-systemd/activation never returns an error so it's
// unclear under what conditions it might do so in the future.
return err
}
if len(listeners) == 0 {
// no systemd socket for this process
err = s.ListenAndServe()
} else if len(listeners) != 1 {
// We can only handle a single socket; fail if we get more than 1.
// If multiple sockets are provided by systemd for the process, it's better to call Serve(l)
// directly instead of using ListenSystemd()
panic("More than one socket fds from systemd")
} else {
// start accepting connections from the systemd-provided socket
println("using socket from systemd socket activation")
err = s.Serve(listeners[0])
}
if err == http.ErrServerClosed {
// returned from Serve functions when server.Shutdown() was initiated
err = nil
}
return err
}
package systemd
import (
"context"
"fmt"
"io"
"net"
"os"
"os/exec"
"os/signal"
"strconv"
"strings"
"syscall"
"time"
"github.com/coreos/go-systemd/daemon"
)
type ShutdownMode int
const (
ShutdownSync ShutdownMode = iota
ShutdownAsync
)
// GetInvocationId returns the systemd ID for the process invocation.
// Returns an empty string if not run by systemd.
// This can be used to test if the process is controlled by systemd or not.
func GetInvocationId() string {
invocationId := os.Getenv("INVOCATION_ID")
if len(invocationId) > 0 && len(os.Getenv("JOURNAL_STREAM")) == 0 {
// systemd >=v231 sets JOURNAL_STREAM as well. If not set, not systemd.
invocationId = ""
}
return invocationId
}
// EnableGracefulShutdown installs signal handlers and invokes shutdownHandler when the
// process should shut down.
//
// The returned channel closes when shutdownHandler has been called and returned.
//
// When SIGINT or SIGTERM is received, shutdownHandler(nil) is called.
// This corresponds to "service NAME stop" and "service NAME restart" systemd actions.
// The shutdownHandler should shut down as quickly as possible. Shutdown is synchronous.
//
// When SIGHUP is received, shutdownHandler(ready func()) is called.
// This corresponds to "service NAME reload" systemd action.
// The shutdownHandler should do the following when the "ready" function is non-nil:
//
// 1. Stop accepting network requests, stop listening and reliquish any exclusive-access
// resources that only one process should access at a time, like file-based databases.
//
// 2. Call read()
//
// 3. Proceed with shut down as smoothly as possible, taking whatever time it needs.
// This may include waiting for ongoing processes or network requests to complete.
//
// When ready() is called a new process will be spawned to take the place of the current one.
//
// Common use of this function looks like this:
//
// func main() {
// done := EnableGracefulShutdown(func(ready func()) {
// stopAcceptingConnections()
// closeDatabase()
// ready()
// waitForCurrentWorkToFinish()
// })
// openDatabase()
// startAcceptingConnections()
// <-done
// }
//
func EnableGracefulShutdown(shutdownHandler func(ready func())) chan struct{} {
done := make(chan struct{})
quit := make(chan os.Signal, 1)
signal.Notify(quit,
// for reload:
syscall.SIGHUP,
// for stop or full restart:
syscall.SIGINT, syscall.SIGTERM,
)
go func() {
sig := <-quit
fmt.Printf("systemdInstallSignalHandlers got signal %v\n", sig)
switch sig {
case syscall.SIGINT, syscall.SIGTERM:
// Shutdown asap
shutdownHandler(nil)
case syscall.SIGHUP:
// Execute a short-lived process and asks systemd to track it instead of us.
fmt.Println("[reload] begin")
// continueCh is a channel used to signal that...
// 1. shutdown has been initialized (send value)
// 2. shutdown has completed (close)
continueCh := make(chan struct{})
// Invoke user shutdown handler in a goroutine so we can wrap the channel.
// Use a short timeout to catch user error -- the continuation callback should be
// invoked usually immediately or within a few milliseconds of shutdownHandler.
// This way we can also avoid deadlock if shutdownHandler never calls the
// continuation callback (otherwise we would never exit.)
initCallbackTimeout := 5 * time.Second
ctx, cancelCtx := context.WithTimeout(context.Background(), initCallbackTimeout)
go func() {
shutdownHandler(func() {
fmt.Println("[reload] user called asyncContinue")
select {
case <-ctx.Done():
// timeout (do not send on continueCh)
fmt.Println("[reload] user called asyncContinue (timeout)")
default:
// finished in time
fmt.Println("[reload] user called asyncContinue (ok)")
continueCh <- struct{}{}
}
})
close(continueCh)
}()
// wait for shutdown to begin (i.e. to reliquish exclusive-access resources)
fmt.Println("[reload] waiting for asyncContinue")
select {
case <-continueCh:
cancelCtx()
case <-ctx.Done():
// timeout
fmt.Fprintf(
os.Stderr,
"timeout in systemd.EnableGracefulShutdown handler: "+
"Continuation function was not called within %s.\n",
initCallbackTimeout,
)
}
// spawn decoy process
fmt.Println("[reload] spawning decoy process")
pid, err := detachedSleep()
if err != nil {
fmt.Printf("error in detachedSleep %s", err.Error())
} else {
// tell systemd to track the decoy process instead of the current process
daemon.SdNotify(false, fmt.Sprintf("MAINPID=%d", pid))
// Wait for confirmation from systemd
// Since we relinquished control to the decoy process with MAINPID= this will
// cause a warning e.g. "Got notification message from PID 123, but reception
// only permitted for main PID 456" in the systemd journal log. However it is
// a good trick that works: when this function returns it means that systemd
// has received the MAINPID= message, starting a new process, and we can safely
// proceed with shutdown of this current process.
//
// An alternative to this is to sleep for some long period of time, like 1s,
// to reduce the probability of systemd missing our MAINPID message.
//
// Note that as soon as systemd has interpreted the MAINPID notification it
// proceeds with starting a new process, so whatever we do beyond this point
// only affects shutdown of this current process.
//
SdNotifyBarrier(false, time.Second)
// give systemd a tiny bit extra time in case the implementation interpreted
// the BARRIER notification internally on a separate message queue than MAINPID.
time.Sleep(10 * time.Millisecond)
}
// wait for shutdown to complete (this may take whatever time)
fmt.Println("awaiting shutdown finalization")
<-continueCh
} // switch sig
close(done)
}()
return done
}
func init() {
// As early as possible, check if we should be the decoy.
state := os.Getenv("__SD_SHUTDOWN")
os.Unsetenv("__SD_SHUTDOWN")
switch state {
case "1":
// First step, fork again.
execPath, err := selfExeFile()
if err != nil {
panic(fmt.Errorf("selfExeFile error %s", err))
}
child, err := os.StartProcess(
execPath,
[]string{execPath},
&os.ProcAttr{
Env: append(os.Environ(), "__SD_SHUTDOWN=2"),
})
if err != nil {
panic(fmt.Errorf("cannot execute sleep command: %s", err))
}
// Advertise child's PID and exit. Child will be
// orphaned and adopted by PID 1.
fmt.Printf("%d", child.Pid)
os.Exit(0)
case "2":
// wait for systemd
SdNotifyBarrier(false, time.Second)
// time.Sleep(time.Millisecond * 10000)
os.Exit(0)
}
// Not the shutdown helper. Business as usual.
}
// selfExeFile returns the absolute path to ourselves. This relies on
// /proc/self/exe which may be a symlink to a deleted path (for
// example, during an upgrade).
func selfExeFile() (string, error) {
execPath, err := os.Readlink("/proc/self/exe")
execPath = strings.TrimSuffix(execPath, " (deleted)")
return execPath, err
}
// detachedSleep spawns a detached process sleeping one second and
// returns its PID. A full daemonization is not needed as the process
// is short-lived.
func detachedSleep() (uint64, error) {
selfexe, err := selfExeFile()
if err != nil {
return 0, err
}
fmt.Printf("detachedSleep starting command %q\n", selfexe)
cmd := exec.Command(selfexe)
cmd.Env = append(os.Environ(), "__SD_SHUTDOWN=1")
out, err := cmd.Output()
if err != nil {
return 0, err
}
pid, err := strconv.ParseUint(strings.TrimSpace(string(out)), 10, 64)
if err != nil {
return 0, fmt.Errorf("cannot parse PID of sleep command: %s", err)
}
return pid, nil
}
func SdNotifyBarrier(unsetEnvironment bool, timeout time.Duration) error {
// modelled after libsystemd's sd_notify_barrier
// construct unix socket address from systemd environment variable
socketAddr := &net.UnixAddr{
Name: os.Getenv("NOTIFY_SOCKET"),
Net: "unixgram",
}
if socketAddr.Name == "" {
return fmt.Errorf("NOTIFY_SOCKET missing")
}
// create a pipe for communicating with systemd daemon
pipe_r, pipe_w, err := os.Pipe() // (r *File, w *File, error)
if err != nil {
return err
}
if unsetEnvironment {
if err := os.Unsetenv("NOTIFY_SOCKET"); err != nil {
return err
}
}
// connect to unix socket at socketAddr
conn, err := net.DialUnix(socketAddr.Net, nil, socketAddr)
if err != nil {
return err
}
defer conn.Close()
// get the FD for the unix socket file
connf, err := conn.File()
if err != nil {
return err
}
// send over write end of the pipe to the systemd daemon
fdRights := syscall.UnixRights(int(pipe_w.Fd()))
err = syscall.Sendmsg(int(connf.Fd()), []byte("BARRIER=1"), fdRights, nil, 0)
if err != nil {
return err
}
pipe_w.Close()
// wait for systemd to close the pipe
var b [1]byte
pipe_r.SetReadDeadline(time.Now().Add(timeout))
_, err = pipe_r.Read(b[:])
if err == io.EOF {
err = nil
}
return err
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment