Created
July 24, 2016 21:10
-
-
Save craigcabrey/47b4de1f99b5f2506d143f3f47b1cdff to your computer and use it in GitHub Desktop.
Python wrapper for OpenBSD's rcctl
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import subprocess | |
class Service: | |
blacklist = ( | |
'accounting', | |
'check_quotas', | |
'ipsec', | |
'multicast', | |
'pf', | |
'spamd_black' | |
) | |
def __init__(self, name): | |
self._class = None | |
self._flags = None | |
self._name = name | |
self._status = None | |
self._timeout = None | |
self._user = None | |
try: | |
result = subprocess.check_output(['rcctl', 'get', name]) | |
except subprocess.CalledProcessError as e: | |
result = e.output | |
self._enabled = False | |
else: | |
self._enabled = True | |
for config in result.decode('utf-8').splitlines(): | |
pair = config.split('=') | |
if pair[0].endswith('_class'): | |
self._class = pair[1] | |
elif pair[0].endswith('_flags'): | |
self._flags = pair[1] | |
elif pair[0].endswith('_timeout'): | |
self._timeout = pair[1] | |
elif pair[0].endswith('_user'): | |
self._user = pair[1] | |
@staticmethod | |
def get_all(): | |
return Service.get('all') | |
@staticmethod | |
def get_enabled(): | |
return Service.get('on') | |
@staticmethod | |
def get(service_type): | |
result = subprocess.check_output(['rcctl', 'ls', service_type]) | |
services = result.decode('utf-8').splitlines() | |
return [ | |
Service(name) | |
for name in services | |
if name not in Service.blacklist | |
] | |
@property | |
def enabled(self): | |
return self._enabled | |
@property | |
def flags(self): | |
return self._flags | |
@property | |
def name(self): | |
return self._name | |
@property | |
def status(self): | |
if self._status: | |
update_delta = datetime.datetime.now() - self._status['updated'] | |
if update_delta.seconds < 60: | |
return self._status['value'] | |
try: | |
status = subprocess.check_output(['rcctl', 'check', self.name]) | |
except subprocess.CalledProcessError: | |
result ='failed' if self.enabled else 'disabled' | |
else: | |
if '{name}(ok)'.format(name=self.name) in status.decode('utf-8'): | |
result = 'ok' | |
else: | |
result = 'failed' | |
finally: | |
self._status = { | |
'updated': datetime.datetime.now(), | |
'value': result | |
} | |
return result | |
@property | |
def timeout(self): | |
return self._timeout | |
@property | |
def type(self): | |
return self._class | |
@property | |
def user(self): | |
return self._user | |
def start(self, force=False): | |
pass | |
def stop(self): | |
pass | |
def serialize(self): | |
return { | |
'name': self.name, | |
'enabled': self.enabled, | |
'status': self.status | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment