Skip to content

Instantly share code, notes, and snippets.

@maurobaraldi
Last active July 8, 2022 01:10
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save maurobaraldi/abf43370627298e7594307b6763f63bc to your computer and use it in GitHub Desktop.
Save maurobaraldi/abf43370627298e7594307b6763f63bc to your computer and use it in GitHub Desktop.
Self healing systemd services if it is in failed status.
#!/usr/bin/env python3
from subprocess import (
Popen,
PIPE
)
class Service(object):
def __init__(self, service_name):
self.service = f"{service_name}.service"
def __run__(self, action):
with Popen(["systemctl", action, self.service], stdout=PIPE, stderr=PIPE) as process:
return {'status': process.stdout.read(), 'error': process.stderr.read()}
def reload(self):
self.__run__('reload')
def restart(self):
self.__run__('restart')
def start(self):
self.__run__('start')
@property
def status(self):
with Popen(["systemctl", "show", "-p", "SubState", "-p", "ActiveState", self.service"], stdout=PIPE, stderr=PIPE) as process:
res = [item.decode('utf-8').split('=') for item in process.stdout.readlines()]
return {i[0]: i[1].strip() for i in res}
def stop(self):
self.__run__('start')
def heal(self):
if self.status.get('ActiveState') == 'failed':
if self.status.get('SubState') == 'failed':
self.reload()
self.restart()
else:
self.reload()
return self.status
## Usage
#
# sshd = Service('sshd')
# #check status
# if sshd.status.get('SubStatus') == 'failed':
# sshd.heal()
## Alternative implementation: Add an initializer tho convert it from a simple lib to act as a script also,
#
# if __name__ == __main__:
# from sys import argv
# if len(argv) > 1:
# svc = Service(argv)
# if sshd.status.get('SubStatus') == 'failed':
# sshd.heal()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment