Last active
November 27, 2019 22:54
-
-
Save pcuzner/ac542ce3fa9a4699bb9310b1fd5095d0 to your computer and use it in GitHub Desktop.
deploying ceph monitoring components. This is just test code to identify the processing flow that would need to be added to the ceph-daemon script
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import yaml | |
import threading | |
import logging | |
import json | |
import os | |
import subprocess | |
import socket | |
import sys | |
import fcntl | |
import select | |
import shutil | |
try: | |
from ConfigParser import ConfigParser # py2 | |
except ImportError: | |
from configparser import ConfigParser # py3 | |
try: | |
from firewall.client import FirewallClient | |
except ImportError: | |
FirewallClient = None | |
logger = logging.getLogger(__name__) | |
class Monitoring(object): | |
ports = [3000, 9093, 9094, 9095, 9100] | |
image_metadata = { | |
"prometheus": { | |
"name": "Prometheus Server", | |
"unit":"prometheus", | |
"image": "prom/prometheus:latest", | |
"user": 65534, | |
"volumes": [ | |
('/etc/prometheus', '/etc/prometheus:Z'), | |
('/var/lib/prometheus', '/prometheus:Z') | |
], | |
"cpu": 2, | |
"memory": '4GB', | |
"args": [ | |
"--config.file=/etc/prometheus/prometheus.yml", | |
"--storage.tsdb.path=/prometheus", | |
"--web.listen-address=:9095" | |
] | |
}, | |
"grafana": { | |
"name": "Grafana Server", | |
"unit":"grafana", | |
"image": "grafana/grafana:latest", | |
"volumes": [ | |
('/etc/grafana', '/etc/grafana:Z'), | |
('/var/lib/grafana', '/var/lib/grafana:Z') | |
], | |
"cpu": 2, | |
"memory": '4GB' | |
}, | |
"alertmanager": { | |
"name": "Prometheus Alert Manager Service", | |
"unit": "alertmanager", | |
"image": "prom/alertmanager:latest", | |
"user": 65534, | |
"volumes": [ | |
('/etc/alertmanager', '/etc/alertmanager:Z'), | |
('/var/lib/alertmanager', '/alertmanager:Z') | |
], | |
"cpu": 1, | |
"memory": '1GB', | |
"args": [ | |
"--config.file=/etc/alertmanager/alertmanager.yml", | |
"--storage.path=/alertmanager", | |
"--web.listen-address=:9093" | |
] | |
}, | |
"node-exporter": { | |
"name": "Prometheus Node Exporter service", | |
"unit": "node-exporter", | |
"image": "prom/node-exporter:latest", | |
"volumes": [ | |
("/proc", "/host/proc:ro"), | |
("/sys", "/host/sys:ro") | |
], | |
"cpu": 1, | |
"memory": '1GB', | |
"args": [ | |
"--path.procfs=/host/proc", | |
"--path.sysfs=/host/sys", | |
"--no-collector.timex" | |
] | |
}, | |
"ceph-daemon": { | |
"name": "Ceph Daemon", | |
"image": "ceph/daemon-base:latest-master-devel" | |
} | |
} | |
def freespace_ok(path, gb_needed): | |
fs_stats = os.statvfs(path) | |
free_gb = fs_stats.f_bsize * fs_stats.f_bavail / 1024**3 | |
if free_gb < gb_needed: | |
return False | |
else: | |
return True | |
def port_in_use(port_num): | |
"""Detect whether a port is in use on the local machine""" | |
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
try: | |
s.connect(("127.0.0.1",port_num)) | |
except ConnectionRefusedError: | |
return False | |
else: | |
s.close() | |
return True | |
def update_fw(svcport, enable=True): | |
""" Update FW configuration | |
Manage the host's firewall configuration using the firewalld client library | |
Args: | |
svcport (str) : string holding the port number, a ',' separated port list, a port-range, or service name | |
enable (bool) : enable(default) or disable the fw for this svc/port | |
Returns: | |
none | |
""" | |
def execute_firewall_cmd(func, *parms): | |
"""Execute the FW function, returning a bool to reflect success/fail""" | |
try: | |
func(*parms) | |
except Exception as e: | |
# e is a DBusException object | |
if e.args[0].startswith("ALREADY_ENABLED"): | |
return True | |
else: | |
return False | |
else: | |
return True | |
if not FirewallClient: | |
# no mechanism to update the FW, so simply exit | |
return | |
client = FirewallClient() | |
default_zone = client.getDefaultZone() | |
change_complete = False | |
if "," in svcport: | |
port_list = svcport.split(',') | |
if all(port.isdigit() for port in port_list): | |
for port in port_list: | |
if enable: | |
change_complete = execute_firewall_cmd(client.addPort, default_zone, "{}".format(port), "tcp") | |
else: | |
change_complete = execute_firewall_cmd(client.removePort, default_zone, "{}".format(port), "tcp") | |
else: | |
# invalid port list string | |
return | |
elif svcport.isdigit(): | |
if enable: | |
change_complete = execute_firewall_cmd(client.addPort, default_zone, "{}".format(svcport), "tcp") | |
else: | |
change_complete = execute_firewall_cmd(client.removePort, default_zone, "{}".format(svcport), "tcp") | |
elif svcport.count('-') == 1: | |
ports = svcport.split("-") | |
if len(ports) == 2 and all(p.isdigit() for p in ports): | |
# this is a range, validate low:high and integers | |
if int(ports[0] < int(ports[1])): | |
if enable: | |
change_complete = execute_firewall_cmd(client.addPort, default_zone, "{}".format(svcport), "tcp") | |
else: | |
change_complete = execute_firewall_cmd(client.removePort, default_zone, "{}".format(svcport), "tcp") | |
else: | |
# invalid port range - not low->high | |
return | |
else: | |
# invalid range - not integers | |
return | |
elif svcport in client.listServices(): | |
# this is a request for a valid service name | |
if enable: | |
change_complete = execute_firewall_cmd(client.addService, default_zone, svcport) | |
else: | |
change_complete = execute_firewall_cmd(client.removeService, default_zone, svcport) | |
else: | |
# invalid request - not a port, port range or valid service name | |
return | |
if change_complete: | |
# persist the change | |
client.runtimeToPermanent() | |
def threaded(func): | |
def wrapper(*args, **kwargs): | |
t = threading.Thread(target=func, args=args, kwargs=kwargs) | |
t.daemon = True | |
t.start() | |
return t | |
return wrapper | |
class Container(object): | |
def __init__(self, unit, name, user, image, volumes, cpu, memory, args): | |
self.unit = unit | |
self.name = name | |
self.user = user | |
self.image = image | |
self.volumes = volumes | |
self.cpu = cpu | |
self.memory = memory | |
self.args = args | |
self.thread = None | |
@property | |
def running(self): | |
out, err, rc = call(['systemctl', 'is-active', self.unit]) | |
if out.strip() == 'active': | |
return True | |
else: | |
return False | |
@property | |
def run_cmd(self): | |
vols = "" | |
args = "" | |
user = "" | |
for host_path, internal_path in self.volumes: | |
vols += '-v "{}:{}" \\\n'.format(host_path, internal_path) | |
for a in self.args: | |
args += "{} \\\n".format(a) | |
if self.user: | |
user = '--user={} \\\n'.format(self.user) | |
s = ("{container_path} run --rm --name={unit} \\\n" | |
"{volumes}" | |
"{user}" | |
"--net=host \\\n" | |
"--cpus={cpu} \\\n" | |
"--memory={memory} \\\n" | |
"{image}".format(container_path=container_path, | |
unit=self.unit, | |
volumes=vols, | |
user=user, | |
cpu=self.cpu, | |
memory=self.memory, | |
image=self.image)) | |
if args: | |
# remove the last newline separator controls | |
args = args[:-3] | |
s += " \\\n{}".format(args) | |
return s | |
@threaded | |
def pull(self): | |
print("pull issued for {}".format(self.image)) | |
call([container_path, 'pull', self.image]) | |
def start(self): | |
print("Starting {} container".format(self.unit)) | |
out, err, rc = call(['systemctl', 'start', self.unit]) | |
def install(self): | |
"""create a systemd unit file for this container""" | |
unit = ConfigParser() | |
unit.optionxform = str # prevent configparser converting all variables to lowercase | |
unit.add_section('Unit') | |
unit.set('Unit', 'Description', self.name) | |
unit.set('Unit', 'After', 'network.target') | |
unit.add_section('Service') | |
unit.set('Service', 'EnvironmentFile', '-/etc/environment') | |
unit.set('Service', 'ExecStartPre', '-{} rm -f {}'.format(container_path, self.unit)) | |
unit.set('Service', 'ExecStart', self.run_cmd) | |
unit.set('Service', 'ExecStop', '-{} stop {}'.format(container_path, self.unit)) | |
unit.set('Service', 'Restart', 'always') | |
unit.set('Service', 'RestartSec', '10s') | |
unit.set('Service', 'TimeoutStartSec', '120') | |
unit.set('Service', 'TimeoutStopSec', '15') | |
unit.add_section('Install') | |
unit.set('Install', 'WantedBy', 'multi-user.target') | |
# unit.write(sys.stdout) | |
write_ini(unit, '/etc/systemd/system/{}.service'.format(self.unit)) | |
out, err, rc = call(['systemctl', 'enable', self.unit]) | |
def deploy_dashboards(dashboard_dir="/etc/grafana/dashboards/ceph-dashboard"): | |
"""Deploy the dashboards in the base image to the local filesystem""" | |
if not os.path.exists(dashboard_dir): | |
# if dashboards dir not there, likely monitoring has not been installed yet! | |
return | |
# Extract the dashboards - podman mount easiest, but must support docker...sigh | |
dashboard_state = "Refreshing" if os.listdir('/etc/grafana/dashboards/ceph-dashboard') else "Extracting" | |
print("{} dashboards from {}".format(dashboard_state, Monitoring.image_metadata['ceph-daemon']['image'])) | |
# out holds the container ID | |
out, err, rc = call([container_path, "create", "-it", "--name", "dummy", Monitoring.image_metadata['ceph-daemon']['image'], "sh"]) | |
if rc > 0: | |
# Insert error handling here | |
return | |
out, err, rc = call([container_path, "cp", "{}:/etc/grafana/dashboards/ceph-dashboard/".format(out.strip()), "/etc/grafana/dashboards/ceph-dashboard"]) | |
if rc > 0: | |
# Insert error handling here | |
return | |
out, err, rc = call([container_path, "rm", "dummy"]) | |
if rc > 0: | |
# Insert error handling here | |
return | |
def write_yml(config_data, file_name): | |
with open(file_name, 'w') as f: | |
f.write(yaml.safe_dump(config_data, explicit_start=True, indent=2)) | |
def write_ini(ini_object, file_name): | |
with open(file_name, 'w') as f: | |
ini_object.write(f) | |
def update_alertmanager(mgrs): | |
"""Update the alertmanager config to reflect current mgr list""" | |
# TODO - PLACEHOLDER | |
pass | |
def add_monitoring(mgrs): | |
if any(port_in_use(p) for p in Monitoring.ports): | |
# at least one port that we need is in use therefore can't deploy on this host | |
return | |
# check there is at least 20GB freespace in /var/lib | |
if not freespace_ok('/var/lib', 20): | |
# not enough freespace | |
print("Warning: Low freespace on '/var/lib'") | |
# dashboard are in the daemon-base image @ /etc/grafana/dashboards/ceph-dashboard/*.json | |
containers = [] | |
container_lookup = dict() | |
print("Waiting for container images") | |
for name in Monitoring.image_metadata: | |
image = Monitoring.image_metadata[name] | |
container = Container( | |
unit=image.get('unit', None), | |
name=image['name'], | |
user=image.get('user', None), | |
image=image['image'], | |
volumes=image.get('volumes',[]), | |
cpu=image.get('cpu',1), | |
memory=image.get('memory',1), | |
args=image.get('args', []) | |
) | |
container.thread = container.pull() | |
containers.append(container) | |
container_lookup[name] = len(containers) - 1 | |
# wait until container images are all pulled ? timeout? errors? | |
for t in containers: | |
t.thread.join() | |
# Create the dirs | |
makedirs('/etc/grafana', 472, 472, 0o755) | |
makedirs('/etc/grafana/dashboards/ceph-dashboard', 472, 472, 0o755) | |
makedirs('/etc/grafana/provisioning/dashboards', 472, 472, 0o755) | |
makedirs('/etc/grafana/provisioning/datasources', 472, 472, 0o755) | |
makedirs('/var/lib/grafana', 472, 472, 0o755) | |
makedirs('/etc/prometheus', 65534, 0, 0o755) | |
makedirs('/etc/prometheus/alerting', 0, 0, 0o755) | |
makedirs('/var/lib/prometheus', 65534, 0, 0o755) | |
makedirs('/etc/alertmanager', 65534, 0, 0o755) | |
makedirs('/var/lib/alertmanager', 65534, 0, 0o755) | |
# TODO fix the SELINUX context! | |
# define the configs | |
yaml_config = dict() | |
yaml_config['prometheus'] = { | |
"global": { | |
"scrape_interval": "5s", | |
"evaluation_interval": "10s" | |
}, | |
"rule_files": [ | |
"/etc/prometheus/alerting/*" | |
], | |
"scrape_configs": [ | |
{ | |
"job_name": "prometheus", | |
"static_configs": [ | |
{ "targets": [ | |
'localhost:9092' | |
]} | |
] | |
} | |
] | |
} | |
yaml_config['ds_provisioning'] = { | |
"apiVersion": 1, | |
"deleteDataSources": [ | |
{ | |
"name": "Dashboard", | |
"orgId": 1 | |
} | |
], | |
"datasources": [ | |
{ | |
"name": "Dashboard", | |
"type": "prometheus", | |
"access": "proxy", | |
"orgId": 1, | |
# change this url? | |
"url": "http://localhost:9092", | |
"basicAuth": False, | |
"isDefault": True, | |
"editable": False | |
} | |
] | |
} | |
yaml_config['db_provisioning'] = { | |
"apiVersion": 1, | |
"providers": [ | |
{ | |
"name": "Ceph Dashboard", | |
"orgId": 1, | |
"folder": "ceph-dashboard", | |
"type": "file", | |
"disableDeletion": False, | |
"updateIntervalSeconds": 3, | |
"editable": False, | |
"options": { | |
"path": "/etc/grafana/dashboards/ceph-dashboard" | |
} | |
} | |
] | |
} | |
mgr_list = [{"url": "http://{}:8443/api/prometheus_receiver".format(m)} for m in mgrs.split(',')] | |
yaml_config['alertmanager'] = { | |
"global": { | |
"resolve_timeout": "5m" | |
}, | |
"route": { | |
"group_by": ["alertname"], | |
"group_wait": "10s", | |
"group_interval": "10s", | |
"repeat_interval": "1h", | |
"receiver": "ceph-dashboard" | |
}, | |
"receivers": [ | |
{ | |
"name": "ceph-dashboard", | |
"webhook_configs": mgr_list | |
} | |
] | |
} | |
ini = ConfigParser() | |
ini.add_section('users') | |
ini.set('users', 'default_theme', 'light') | |
ini.add_section('auth.anonymous') | |
ini.set('auth.anonymous', 'enabled', 'true') | |
ini.set('auth.anonymous', 'org_name', 'Main Org.') | |
ini.set('auth.anonymous', 'org_role', 'Viewer') | |
ini.add_section('server') | |
ini.set('server', 'protocol', 'http') | |
ini.set('server', 'http_port', '3000') | |
ini.set('server', 'http_addr', '') # bind to all IP addresses | |
ini.add_section('security') | |
ini.set('security', 'admin_user', 'admin') | |
ini.set('security', 'admin_password', 'admin') # <-- use the password from the invocation, or generated one | |
ini.set('security', 'allow_embedding', 'true') | |
# persist the config files | |
print("Creating config files") | |
write_ini(ini, '/etc/grafana/grafana.ini') | |
write_yml(yaml_config['prometheus'], '/etc/prometheus/prometheus.yml') | |
write_yml(yaml_config['ds_provisioning'], '/etc/grafana/provisioning/datasources/ceph-dashboard.yml') | |
write_yml(yaml_config['alertmanager'], '/etc/alertmanager/alertmanager.yml') | |
write_yml(yaml_config['db_provisioning'], '/etc/grafana/provisioning/dashboards/ceph-dashboard.yml') | |
deploy_dashboards() | |
# apply any selinux requirements | |
# discard the ceph/daemon-base container | |
del containers[container_lookup['ceph-daemon']] | |
del container_lookup['ceph-daemon'] | |
# install the grafana plugins | |
# create systemd units | |
print("creating systemd unit files, and reload") | |
for container in containers: | |
container.install() | |
out, err, rc = call(["systemctl", "daemon-reload"]) | |
# start containers via systemd | |
print("starting containers with systemctl") | |
for container in containers: | |
container.start() | |
# update firewall | |
print("updating firewall rules for monitoring services") | |
update_fw(','.join([str(p) for p in Monitoring.ports])) | |
def remove_monitoring(purge=True): | |
print("Stopping and removing systemd units") | |
for name in Monitoring.image_metadata: | |
container_data = Monitoring.image_metadata[name] | |
if container_data.get('unit', None): | |
# this is a unit we should have created | |
out, err, rc = call(['systemctl', 'stop', container_data['unit']]) | |
unit_file = '/etc/systemd/system/{}.service'.format(container_data['unit']) | |
if os.path.exists(unit_file): | |
os.remove(unit_file) | |
out, err, rc = call(['systemctl', 'daemon-reload']) | |
print("Deleting configuration directories") | |
shutil.rmtree('/etc/grafana') | |
shutil.rmtree('/etc/alertmanager') | |
shutil.rmtree('/etc/prometheus') | |
shutil.rmtree('/var/lib/grafana') | |
shutil.rmtree('/var/lib/alertmanager') | |
if purge: | |
print("deleting old prometheus monitoring data") | |
shutil.rmtree('/var/lib/prometheus') | |
print("Closing firewall ports") | |
update_fw(','.join([str(p) for p in Monitoring.ports]), enable=False) | |
def command_monitoring(action, mgrs): | |
if action == 'add': | |
# mgrs must be a string, possibly comma sep | |
add_monitoring(mgrs=mgrs) | |
elif action == 'remove': | |
remove_monitoring() | |
################################## | |
# Popen wrappers, lifted from ceph-volume | |
def call(command, desc=None, verbose=False, **kwargs): | |
""" | |
Wrap subprocess.Popen to | |
- log stdout/stderr to a logger, | |
- decode utf-8 | |
- cleanly return out, err, returncode | |
If verbose=True, log at info (instead of debug) level. | |
:param verbose_on_failure: On a non-zero exit status, it will forcefully set | |
logging ON for the terminal | |
""" | |
if not desc: | |
desc = command[0] | |
verbose_on_failure = kwargs.pop('verbose_on_failure', True) | |
logger.debug("Running command: %s" % ' '.join(command)) | |
process = subprocess.Popen( | |
command, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
close_fds=True, | |
**kwargs | |
) | |
# get current p.stdout flags, add O_NONBLOCK | |
stdout_flags = fcntl.fcntl(process.stdout, fcntl.F_GETFL) | |
stderr_flags = fcntl.fcntl(process.stderr, fcntl.F_GETFL) | |
fcntl.fcntl(process.stdout, fcntl.F_SETFL, stdout_flags | os.O_NONBLOCK) | |
fcntl.fcntl(process.stderr, fcntl.F_SETFL, stderr_flags | os.O_NONBLOCK) | |
out = '' | |
err = '' | |
reads = None | |
stop = False | |
out_buffer = '' # partial line (no newline yet) | |
err_buffer = '' # partial line (no newline yet) | |
while not stop: | |
if reads and process.poll() is not None: | |
# we want to stop, but first read off anything remaining | |
# on stdout/stderr | |
stop = True | |
else: | |
reads, _, _ = select.select( | |
[process.stdout.fileno(), process.stderr.fileno()], | |
[], [] | |
) | |
for fd in reads: | |
try: | |
message = os.read(fd, 1024) | |
if not isinstance(message, str): | |
message = message.decode('utf-8') | |
if fd == process.stdout.fileno(): | |
out += message | |
message = out_buffer + message | |
lines = message.split('\n') | |
out_buffer = lines.pop() | |
for line in lines: | |
if verbose: | |
logger.info(desc + ':stdout ' + line) | |
else: | |
logger.debug(desc + ':stdout ' + line) | |
elif fd == process.stderr.fileno(): | |
err += message | |
message = err_buffer + message | |
lines = message.split('\n') | |
err_buffer = lines.pop() | |
for line in lines: | |
if verbose: | |
logger.info(desc + ':stderr ' + line) | |
else: | |
logger.debug(desc + ':stderr ' + line) | |
else: | |
assert False | |
except (IOError, OSError): | |
pass | |
returncode = process.wait() | |
if out_buffer != '': | |
if verbose: | |
logger.info(desc + ':stdout ' + out_buffer) | |
else: | |
logger.debug(desc + ':stdout ' + out_buffer) | |
if err_buffer != '': | |
if verbose: | |
logger.info(desc + ':stderr ' + err_buffer) | |
else: | |
logger.debug(desc + ':stderr ' + err_buffer) | |
if returncode != 0 and verbose_on_failure and not verbose: | |
# dump stdout + stderr | |
logger.info('Non-zero exit code %d from %s' % (returncode, ' '.join(command))) | |
for line in out.splitlines(): | |
logger.info(desc + ':stdout ' + line) | |
for line in err.splitlines(): | |
logger.info(desc + ':stderr ' + line) | |
return out, err, returncode | |
def call_throws(command, **kwargs): | |
out, err, ret = call(command, **kwargs) | |
if ret: | |
raise RuntimeError('Failed command: %s' % ' '.join(command)) | |
return out, err, ret | |
def makedirs(dir, uid, gid, mode): | |
# type: (str, int, int, int) -> None | |
if not os.path.exists(dir): | |
os.makedirs(dir, mode=mode) | |
else: | |
os.chmod(dir, mode) | |
os.chown(dir, uid, gid) | |
os.chmod(dir, mode) # the above is masked by umask... | |
if __name__ == '__main__': | |
container_path = '/usr/bin/podman' | |
if len(sys.argv) == 2: | |
if sys.argv[1] in ['add', 'remove']: | |
command_monitoring(action=sys.argv[1], mgrs='rh460p') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment