Last active
December 1, 2017 13:09
-
-
Save bacher09/203303efeb1605835dbc1e13f7f75254 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import os | |
import sys | |
import time | |
import fcntl | |
import socket | |
import signal | |
import argparse | |
import subprocess | |
import struct | |
class WatchdogError(RuntimeError): | |
pass | |
class SystemdWatchdog(object): | |
_SOCKET_ENV = "NOTIFY_SOCKET" | |
def __init__(self): | |
self.socket = None | |
self.address = None | |
def _connect_socket(self): | |
"Low-level code to get connection to systemd bus" | |
address = os.environ.get(self._SOCKET_ENV) | |
if not address: | |
msg = "{0} is empty or isn't set".format(self._SOCKET_ENV) | |
raise WatchdogError(msg) | |
if len(address) <= 1 or address[0] not in ('/', '@'): | |
msg = "{0} contains bad socket address".format(self._SOCKET_ENV) | |
raise WatchdogError(msg) | |
if address[0] == '@': | |
address = "\0" + address[1:] | |
if hasattr(socket, "SOCK_CLOEXEC"): | |
sock_type = socket.SOCK_DGRAM | socket.SOCK_CLOEXEC | |
sock = socket.socket(socket.AF_UNIX, sock_type) | |
else: | |
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) | |
fcntl.fcntl(sock, fcntl.F_SETFD, fcntl.FD_CLOEXEC) | |
self.socket = sock | |
self.address = address | |
def _make_ucred_ancdata(self, pid, uid, gid): | |
ucred = struct.pack("@III", pid, uid, gid) | |
return (socket.SOL_SOCKET, socket.SCM_CREDENTIALS, ucred) | |
def _clean_environment(self): | |
try: | |
del os.environ[self._SOCKET_ENV] | |
except KeyError: | |
pass | |
def watchdog_period(self): | |
val = os.environ.get("WATCHDOG_USEC") | |
if not val: | |
raise WatchdogError("WATCHDOG_USEC is empty or isn't set") | |
try: | |
int_sec = int(val) / 1000000 | |
except ValueError: | |
msg = "WATCHDOG_USEC have incorect value, it should be int" | |
raise WatchdogError(msg) | |
else: | |
return int_sec | |
def connect(self, clean_environment=True): | |
self._connect_socket() | |
if clean_environment: | |
self._clean_environment() | |
def send(self, message, pid=None): | |
if self.socket is None or self.address is None: | |
raise WatchdogError("Not connected") | |
assert isinstance(message, bytes) | |
if pid is None: | |
self.socket.sendto(message, self.address) | |
else: | |
scm_creds = self._make_ucred_ancdata(pid, os.getuid(), os.getgid()) | |
self.socket.sendmsg([message], [scm_creds], 0, self.address) | |
def ready(self, pid=None): | |
"Application is started and ready" | |
self.send(b"READY=1", pid=pid) | |
def stop(self, pid=None): | |
"Applications is going down" | |
self.send(b"STOPPING=1", pid=pid) | |
def ping(self, pid=None): | |
"Send notification that application is alive" | |
self.send(b"WATCHDOG=1", pid=pid) | |
def mainpid(self, main_pid, pid=None): | |
"Notify systemd about main pid" | |
self.send("MAINPID={pid}".format(pid=main_pid).encode('ascii'), pid) | |
class ServiceWatchdog(SystemdWatchdog): | |
description = "Service health indicator for systemd watchdog" | |
def __init__(self): | |
super(ServiceWatchdog, self).__init__() | |
self.parser = self.build_parser() | |
self.service_pid = None | |
self.sleep_interval = None | |
def wait(self, timeout): | |
while timeout > 0: | |
if timeout > 1: | |
time.sleep(1) | |
timeout -= 1 | |
else: | |
time.sleep(timeout) | |
timeout -= timeout | |
if os.getppid() != self.service_pid: # pid died | |
sys.exit(0) | |
def run(self, args=None): | |
try: | |
namespace = self.parser.parse_args(args) | |
self.start_process(namespace.command[0], namespace.command) | |
self.connect(clean_environment=namespace.clean_env) | |
self.sleep_interval = self.watchdog_period() / 2 | |
while True: | |
self.wait(self.sleep_interval) | |
if self.check_service(namespace.health_cmd): | |
self.ping(pid=self.service_pid) | |
except KeyboardInterrupt: | |
pass | |
def check_service(self, check_cmd): | |
try: | |
return_code = subprocess.call([check_cmd], | |
timeout=self.sleep_interval) | |
return return_code == 0 | |
except subprocess.TimeoutError: | |
return False | |
def start_process(self, cmd_file, args): | |
self.service_pid = os.getpid() | |
pid = os.fork() | |
if pid < 0: | |
raise RuntimeError("Can't spawn child process") | |
elif pid > 0: | |
self._disable_cloexec() | |
os.execvp(cmd_file, args) | |
# do health checks from child process | |
def _disable_cloexec(self): | |
"Disable cloexec for all fds" | |
for text_fd in os.listdir('/proc/self/fd'): | |
if not text_fd.isdigit(): | |
continue | |
try: | |
fd = int(text_fd) | |
flags = fcntl.fcntl(fd, fcntl.F_GETFD) | |
fcntl.fcntl(fd, fcntl.F_SETFD, flags & ~fcntl.FD_CLOEXEC) | |
except (ValueError, OSError, IOError): | |
pass | |
@classmethod | |
def build_parser(cls): | |
parser = argparse.ArgumentParser(description=cls.description) | |
parser.add_argument('-c', '--health-cmd', required=True, | |
metavar="COMMAND", help=""" | |
Command to check service health, on success should | |
return 0 exit code | |
""") | |
parser.add_argument('--clean-env', action='store_true', help=""" | |
Remove NOTIFY_SOCKET from environment to prevent access to | |
systemd bus from child process | |
""") | |
parser.add_argument('command', nargs='+', | |
help="Start this command as service") | |
return parser | |
@classmethod | |
def start(cls, args=None): | |
cls().run(args) | |
if __name__ == '__main__': | |
ServiceWatchdog.start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment