Skip to content

Instantly share code, notes, and snippets.

@bacher09
Last active December 1, 2017 13:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bacher09/203303efeb1605835dbc1e13f7f75254 to your computer and use it in GitHub Desktop.
Save bacher09/203303efeb1605835dbc1e13f7f75254 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import os
import sys
import time
import fcntl
import socket
import signal
import argparse
import subprocess
import struct
class WatchdogError(RuntimeError):
pass
class SystemdWatchdog(object):
_SOCKET_ENV = "NOTIFY_SOCKET"
def __init__(self):
self.socket = None
self.address = None
def _connect_socket(self):
"Low-level code to get connection to systemd bus"
address = os.environ.get(self._SOCKET_ENV)
if not address:
msg = "{0} is empty or isn't set".format(self._SOCKET_ENV)
raise WatchdogError(msg)
if len(address) <= 1 or address[0] not in ('/', '@'):
msg = "{0} contains bad socket address".format(self._SOCKET_ENV)
raise WatchdogError(msg)
if address[0] == '@':
address = "\0" + address[1:]
if hasattr(socket, "SOCK_CLOEXEC"):
sock_type = socket.SOCK_DGRAM | socket.SOCK_CLOEXEC
sock = socket.socket(socket.AF_UNIX, sock_type)
else:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
fcntl.fcntl(sock, fcntl.F_SETFD, fcntl.FD_CLOEXEC)
self.socket = sock
self.address = address
def _make_ucred_ancdata(self, pid, uid, gid):
ucred = struct.pack("@III", pid, uid, gid)
return (socket.SOL_SOCKET, socket.SCM_CREDENTIALS, ucred)
def _clean_environment(self):
try:
del os.environ[self._SOCKET_ENV]
except KeyError:
pass
def watchdog_period(self):
val = os.environ.get("WATCHDOG_USEC")
if not val:
raise WatchdogError("WATCHDOG_USEC is empty or isn't set")
try:
int_sec = int(val) / 1000000
except ValueError:
msg = "WATCHDOG_USEC have incorect value, it should be int"
raise WatchdogError(msg)
else:
return int_sec
def connect(self, clean_environment=True):
self._connect_socket()
if clean_environment:
self._clean_environment()
def send(self, message, pid=None):
if self.socket is None or self.address is None:
raise WatchdogError("Not connected")
assert isinstance(message, bytes)
if pid is None:
self.socket.sendto(message, self.address)
else:
scm_creds = self._make_ucred_ancdata(pid, os.getuid(), os.getgid())
self.socket.sendmsg([message], [scm_creds], 0, self.address)
def ready(self, pid=None):
"Application is started and ready"
self.send(b"READY=1", pid=pid)
def stop(self, pid=None):
"Applications is going down"
self.send(b"STOPPING=1", pid=pid)
def ping(self, pid=None):
"Send notification that application is alive"
self.send(b"WATCHDOG=1", pid=pid)
def mainpid(self, main_pid, pid=None):
"Notify systemd about main pid"
self.send("MAINPID={pid}".format(pid=main_pid).encode('ascii'), pid)
class ServiceWatchdog(SystemdWatchdog):
description = "Service health indicator for systemd watchdog"
def __init__(self):
super(ServiceWatchdog, self).__init__()
self.parser = self.build_parser()
self.service_pid = None
self.sleep_interval = None
def wait(self, timeout):
while timeout > 0:
if timeout > 1:
time.sleep(1)
timeout -= 1
else:
time.sleep(timeout)
timeout -= timeout
if os.getppid() != self.service_pid: # pid died
sys.exit(0)
def run(self, args=None):
try:
namespace = self.parser.parse_args(args)
self.start_process(namespace.command[0], namespace.command)
self.connect(clean_environment=namespace.clean_env)
self.sleep_interval = self.watchdog_period() / 2
while True:
self.wait(self.sleep_interval)
if self.check_service(namespace.health_cmd):
self.ping(pid=self.service_pid)
except KeyboardInterrupt:
pass
def check_service(self, check_cmd):
try:
return_code = subprocess.call([check_cmd],
timeout=self.sleep_interval)
return return_code == 0
except subprocess.TimeoutError:
return False
def start_process(self, cmd_file, args):
self.service_pid = os.getpid()
pid = os.fork()
if pid < 0:
raise RuntimeError("Can't spawn child process")
elif pid > 0:
self._disable_cloexec()
os.execvp(cmd_file, args)
# do health checks from child process
def _disable_cloexec(self):
"Disable cloexec for all fds"
for text_fd in os.listdir('/proc/self/fd'):
if not text_fd.isdigit():
continue
try:
fd = int(text_fd)
flags = fcntl.fcntl(fd, fcntl.F_GETFD)
fcntl.fcntl(fd, fcntl.F_SETFD, flags & ~fcntl.FD_CLOEXEC)
except (ValueError, OSError, IOError):
pass
@classmethod
def build_parser(cls):
parser = argparse.ArgumentParser(description=cls.description)
parser.add_argument('-c', '--health-cmd', required=True,
metavar="COMMAND", help="""
Command to check service health, on success should
return 0 exit code
""")
parser.add_argument('--clean-env', action='store_true', help="""
Remove NOTIFY_SOCKET from environment to prevent access to
systemd bus from child process
""")
parser.add_argument('command', nargs='+',
help="Start this command as service")
return parser
@classmethod
def start(cls, args=None):
cls().run(args)
if __name__ == '__main__':
ServiceWatchdog.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment