Skip to content

Instantly share code, notes, and snippets.

@pcuzner
Last active November 27, 2019 22:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pcuzner/ac542ce3fa9a4699bb9310b1fd5095d0 to your computer and use it in GitHub Desktop.
Save pcuzner/ac542ce3fa9a4699bb9310b1fd5095d0 to your computer and use it in GitHub Desktop.
deploying ceph monitoring components. This is just test code to identify the processing flow that would need to be added to the ceph-daemon script
import yaml
import threading
import logging
import json
import os
import subprocess
import socket
import sys
import fcntl
import select
import shutil
try:
from ConfigParser import ConfigParser # py2
except ImportError:
from configparser import ConfigParser # py3
try:
from firewall.client import FirewallClient
except ImportError:
FirewallClient = None
logger = logging.getLogger(__name__)
class Monitoring(object):
ports = [3000, 9093, 9094, 9095, 9100]
image_metadata = {
"prometheus": {
"name": "Prometheus Server",
"unit":"prometheus",
"image": "prom/prometheus:latest",
"user": 65534,
"volumes": [
('/etc/prometheus', '/etc/prometheus:Z'),
('/var/lib/prometheus', '/prometheus:Z')
],
"cpu": 2,
"memory": '4GB',
"args": [
"--config.file=/etc/prometheus/prometheus.yml",
"--storage.tsdb.path=/prometheus",
"--web.listen-address=:9095"
]
},
"grafana": {
"name": "Grafana Server",
"unit":"grafana",
"image": "grafana/grafana:latest",
"volumes": [
('/etc/grafana', '/etc/grafana:Z'),
('/var/lib/grafana', '/var/lib/grafana:Z')
],
"cpu": 2,
"memory": '4GB'
},
"alertmanager": {
"name": "Prometheus Alert Manager Service",
"unit": "alertmanager",
"image": "prom/alertmanager:latest",
"user": 65534,
"volumes": [
('/etc/alertmanager', '/etc/alertmanager:Z'),
('/var/lib/alertmanager', '/alertmanager:Z')
],
"cpu": 1,
"memory": '1GB',
"args": [
"--config.file=/etc/alertmanager/alertmanager.yml",
"--storage.path=/alertmanager",
"--web.listen-address=:9093"
]
},
"node-exporter": {
"name": "Prometheus Node Exporter service",
"unit": "node-exporter",
"image": "prom/node-exporter:latest",
"volumes": [
("/proc", "/host/proc:ro"),
("/sys", "/host/sys:ro")
],
"cpu": 1,
"memory": '1GB',
"args": [
"--path.procfs=/host/proc",
"--path.sysfs=/host/sys",
"--no-collector.timex"
]
},
"ceph-daemon": {
"name": "Ceph Daemon",
"image": "ceph/daemon-base:latest-master-devel"
}
}
def freespace_ok(path, gb_needed):
fs_stats = os.statvfs(path)
free_gb = fs_stats.f_bsize * fs_stats.f_bavail / 1024**3
if free_gb < gb_needed:
return False
else:
return True
def port_in_use(port_num):
"""Detect whether a port is in use on the local machine"""
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.connect(("127.0.0.1",port_num))
except ConnectionRefusedError:
return False
else:
s.close()
return True
def update_fw(svcport, enable=True):
""" Update FW configuration
Manage the host's firewall configuration using the firewalld client library
Args:
svcport (str) : string holding the port number, a ',' separated port list, a port-range, or service name
enable (bool) : enable(default) or disable the fw for this svc/port
Returns:
none
"""
def execute_firewall_cmd(func, *parms):
"""Execute the FW function, returning a bool to reflect success/fail"""
try:
func(*parms)
except Exception as e:
# e is a DBusException object
if e.args[0].startswith("ALREADY_ENABLED"):
return True
else:
return False
else:
return True
if not FirewallClient:
# no mechanism to update the FW, so simply exit
return
client = FirewallClient()
default_zone = client.getDefaultZone()
change_complete = False
if "," in svcport:
port_list = svcport.split(',')
if all(port.isdigit() for port in port_list):
for port in port_list:
if enable:
change_complete = execute_firewall_cmd(client.addPort, default_zone, "{}".format(port), "tcp")
else:
change_complete = execute_firewall_cmd(client.removePort, default_zone, "{}".format(port), "tcp")
else:
# invalid port list string
return
elif svcport.isdigit():
if enable:
change_complete = execute_firewall_cmd(client.addPort, default_zone, "{}".format(svcport), "tcp")
else:
change_complete = execute_firewall_cmd(client.removePort, default_zone, "{}".format(svcport), "tcp")
elif svcport.count('-') == 1:
ports = svcport.split("-")
if len(ports) == 2 and all(p.isdigit() for p in ports):
# this is a range, validate low:high and integers
if int(ports[0] < int(ports[1])):
if enable:
change_complete = execute_firewall_cmd(client.addPort, default_zone, "{}".format(svcport), "tcp")
else:
change_complete = execute_firewall_cmd(client.removePort, default_zone, "{}".format(svcport), "tcp")
else:
# invalid port range - not low->high
return
else:
# invalid range - not integers
return
elif svcport in client.listServices():
# this is a request for a valid service name
if enable:
change_complete = execute_firewall_cmd(client.addService, default_zone, svcport)
else:
change_complete = execute_firewall_cmd(client.removeService, default_zone, svcport)
else:
# invalid request - not a port, port range or valid service name
return
if change_complete:
# persist the change
client.runtimeToPermanent()
def threaded(func):
def wrapper(*args, **kwargs):
t = threading.Thread(target=func, args=args, kwargs=kwargs)
t.daemon = True
t.start()
return t
return wrapper
class Container(object):
def __init__(self, unit, name, user, image, volumes, cpu, memory, args):
self.unit = unit
self.name = name
self.user = user
self.image = image
self.volumes = volumes
self.cpu = cpu
self.memory = memory
self.args = args
self.thread = None
@property
def running(self):
out, err, rc = call(['systemctl', 'is-active', self.unit])
if out.strip() == 'active':
return True
else:
return False
@property
def run_cmd(self):
vols = ""
args = ""
user = ""
for host_path, internal_path in self.volumes:
vols += '-v "{}:{}" \\\n'.format(host_path, internal_path)
for a in self.args:
args += "{} \\\n".format(a)
if self.user:
user = '--user={} \\\n'.format(self.user)
s = ("{container_path} run --rm --name={unit} \\\n"
"{volumes}"
"{user}"
"--net=host \\\n"
"--cpus={cpu} \\\n"
"--memory={memory} \\\n"
"{image}".format(container_path=container_path,
unit=self.unit,
volumes=vols,
user=user,
cpu=self.cpu,
memory=self.memory,
image=self.image))
if args:
# remove the last newline separator controls
args = args[:-3]
s += " \\\n{}".format(args)
return s
@threaded
def pull(self):
print("pull issued for {}".format(self.image))
call([container_path, 'pull', self.image])
def start(self):
print("Starting {} container".format(self.unit))
out, err, rc = call(['systemctl', 'start', self.unit])
def install(self):
"""create a systemd unit file for this container"""
unit = ConfigParser()
unit.optionxform = str # prevent configparser converting all variables to lowercase
unit.add_section('Unit')
unit.set('Unit', 'Description', self.name)
unit.set('Unit', 'After', 'network.target')
unit.add_section('Service')
unit.set('Service', 'EnvironmentFile', '-/etc/environment')
unit.set('Service', 'ExecStartPre', '-{} rm -f {}'.format(container_path, self.unit))
unit.set('Service', 'ExecStart', self.run_cmd)
unit.set('Service', 'ExecStop', '-{} stop {}'.format(container_path, self.unit))
unit.set('Service', 'Restart', 'always')
unit.set('Service', 'RestartSec', '10s')
unit.set('Service', 'TimeoutStartSec', '120')
unit.set('Service', 'TimeoutStopSec', '15')
unit.add_section('Install')
unit.set('Install', 'WantedBy', 'multi-user.target')
# unit.write(sys.stdout)
write_ini(unit, '/etc/systemd/system/{}.service'.format(self.unit))
out, err, rc = call(['systemctl', 'enable', self.unit])
def deploy_dashboards(dashboard_dir="/etc/grafana/dashboards/ceph-dashboard"):
"""Deploy the dashboards in the base image to the local filesystem"""
if not os.path.exists(dashboard_dir):
# if dashboards dir not there, likely monitoring has not been installed yet!
return
# Extract the dashboards - podman mount easiest, but must support docker...sigh
dashboard_state = "Refreshing" if os.listdir('/etc/grafana/dashboards/ceph-dashboard') else "Extracting"
print("{} dashboards from {}".format(dashboard_state, Monitoring.image_metadata['ceph-daemon']['image']))
# out holds the container ID
out, err, rc = call([container_path, "create", "-it", "--name", "dummy", Monitoring.image_metadata['ceph-daemon']['image'], "sh"])
if rc > 0:
# Insert error handling here
return
out, err, rc = call([container_path, "cp", "{}:/etc/grafana/dashboards/ceph-dashboard/".format(out.strip()), "/etc/grafana/dashboards/ceph-dashboard"])
if rc > 0:
# Insert error handling here
return
out, err, rc = call([container_path, "rm", "dummy"])
if rc > 0:
# Insert error handling here
return
def write_yml(config_data, file_name):
with open(file_name, 'w') as f:
f.write(yaml.safe_dump(config_data, explicit_start=True, indent=2))
def write_ini(ini_object, file_name):
with open(file_name, 'w') as f:
ini_object.write(f)
def update_alertmanager(mgrs):
"""Update the alertmanager config to reflect current mgr list"""
# TODO - PLACEHOLDER
pass
def add_monitoring(mgrs):
if any(port_in_use(p) for p in Monitoring.ports):
# at least one port that we need is in use therefore can't deploy on this host
return
# check there is at least 20GB freespace in /var/lib
if not freespace_ok('/var/lib', 20):
# not enough freespace
print("Warning: Low freespace on '/var/lib'")
# dashboard are in the daemon-base image @ /etc/grafana/dashboards/ceph-dashboard/*.json
containers = []
container_lookup = dict()
print("Waiting for container images")
for name in Monitoring.image_metadata:
image = Monitoring.image_metadata[name]
container = Container(
unit=image.get('unit', None),
name=image['name'],
user=image.get('user', None),
image=image['image'],
volumes=image.get('volumes',[]),
cpu=image.get('cpu',1),
memory=image.get('memory',1),
args=image.get('args', [])
)
container.thread = container.pull()
containers.append(container)
container_lookup[name] = len(containers) - 1
# wait until container images are all pulled ? timeout? errors?
for t in containers:
t.thread.join()
# Create the dirs
makedirs('/etc/grafana', 472, 472, 0o755)
makedirs('/etc/grafana/dashboards/ceph-dashboard', 472, 472, 0o755)
makedirs('/etc/grafana/provisioning/dashboards', 472, 472, 0o755)
makedirs('/etc/grafana/provisioning/datasources', 472, 472, 0o755)
makedirs('/var/lib/grafana', 472, 472, 0o755)
makedirs('/etc/prometheus', 65534, 0, 0o755)
makedirs('/etc/prometheus/alerting', 0, 0, 0o755)
makedirs('/var/lib/prometheus', 65534, 0, 0o755)
makedirs('/etc/alertmanager', 65534, 0, 0o755)
makedirs('/var/lib/alertmanager', 65534, 0, 0o755)
# TODO fix the SELINUX context!
# define the configs
yaml_config = dict()
yaml_config['prometheus'] = {
"global": {
"scrape_interval": "5s",
"evaluation_interval": "10s"
},
"rule_files": [
"/etc/prometheus/alerting/*"
],
"scrape_configs": [
{
"job_name": "prometheus",
"static_configs": [
{ "targets": [
'localhost:9092'
]}
]
}
]
}
yaml_config['ds_provisioning'] = {
"apiVersion": 1,
"deleteDataSources": [
{
"name": "Dashboard",
"orgId": 1
}
],
"datasources": [
{
"name": "Dashboard",
"type": "prometheus",
"access": "proxy",
"orgId": 1,
# change this url?
"url": "http://localhost:9092",
"basicAuth": False,
"isDefault": True,
"editable": False
}
]
}
yaml_config['db_provisioning'] = {
"apiVersion": 1,
"providers": [
{
"name": "Ceph Dashboard",
"orgId": 1,
"folder": "ceph-dashboard",
"type": "file",
"disableDeletion": False,
"updateIntervalSeconds": 3,
"editable": False,
"options": {
"path": "/etc/grafana/dashboards/ceph-dashboard"
}
}
]
}
mgr_list = [{"url": "http://{}:8443/api/prometheus_receiver".format(m)} for m in mgrs.split(',')]
yaml_config['alertmanager'] = {
"global": {
"resolve_timeout": "5m"
},
"route": {
"group_by": ["alertname"],
"group_wait": "10s",
"group_interval": "10s",
"repeat_interval": "1h",
"receiver": "ceph-dashboard"
},
"receivers": [
{
"name": "ceph-dashboard",
"webhook_configs": mgr_list
}
]
}
ini = ConfigParser()
ini.add_section('users')
ini.set('users', 'default_theme', 'light')
ini.add_section('auth.anonymous')
ini.set('auth.anonymous', 'enabled', 'true')
ini.set('auth.anonymous', 'org_name', 'Main Org.')
ini.set('auth.anonymous', 'org_role', 'Viewer')
ini.add_section('server')
ini.set('server', 'protocol', 'http')
ini.set('server', 'http_port', '3000')
ini.set('server', 'http_addr', '') # bind to all IP addresses
ini.add_section('security')
ini.set('security', 'admin_user', 'admin')
ini.set('security', 'admin_password', 'admin') # <-- use the password from the invocation, or generated one
ini.set('security', 'allow_embedding', 'true')
# persist the config files
print("Creating config files")
write_ini(ini, '/etc/grafana/grafana.ini')
write_yml(yaml_config['prometheus'], '/etc/prometheus/prometheus.yml')
write_yml(yaml_config['ds_provisioning'], '/etc/grafana/provisioning/datasources/ceph-dashboard.yml')
write_yml(yaml_config['alertmanager'], '/etc/alertmanager/alertmanager.yml')
write_yml(yaml_config['db_provisioning'], '/etc/grafana/provisioning/dashboards/ceph-dashboard.yml')
deploy_dashboards()
# apply any selinux requirements
# discard the ceph/daemon-base container
del containers[container_lookup['ceph-daemon']]
del container_lookup['ceph-daemon']
# install the grafana plugins
# create systemd units
print("creating systemd unit files, and reload")
for container in containers:
container.install()
out, err, rc = call(["systemctl", "daemon-reload"])
# start containers via systemd
print("starting containers with systemctl")
for container in containers:
container.start()
# update firewall
print("updating firewall rules for monitoring services")
update_fw(','.join([str(p) for p in Monitoring.ports]))
def remove_monitoring(purge=True):
print("Stopping and removing systemd units")
for name in Monitoring.image_metadata:
container_data = Monitoring.image_metadata[name]
if container_data.get('unit', None):
# this is a unit we should have created
out, err, rc = call(['systemctl', 'stop', container_data['unit']])
unit_file = '/etc/systemd/system/{}.service'.format(container_data['unit'])
if os.path.exists(unit_file):
os.remove(unit_file)
out, err, rc = call(['systemctl', 'daemon-reload'])
print("Deleting configuration directories")
shutil.rmtree('/etc/grafana')
shutil.rmtree('/etc/alertmanager')
shutil.rmtree('/etc/prometheus')
shutil.rmtree('/var/lib/grafana')
shutil.rmtree('/var/lib/alertmanager')
if purge:
print("deleting old prometheus monitoring data")
shutil.rmtree('/var/lib/prometheus')
print("Closing firewall ports")
update_fw(','.join([str(p) for p in Monitoring.ports]), enable=False)
def command_monitoring(action, mgrs):
if action == 'add':
# mgrs must be a string, possibly comma sep
add_monitoring(mgrs=mgrs)
elif action == 'remove':
remove_monitoring()
##################################
# Popen wrappers, lifted from ceph-volume
def call(command, desc=None, verbose=False, **kwargs):
"""
Wrap subprocess.Popen to
- log stdout/stderr to a logger,
- decode utf-8
- cleanly return out, err, returncode
If verbose=True, log at info (instead of debug) level.
:param verbose_on_failure: On a non-zero exit status, it will forcefully set
logging ON for the terminal
"""
if not desc:
desc = command[0]
verbose_on_failure = kwargs.pop('verbose_on_failure', True)
logger.debug("Running command: %s" % ' '.join(command))
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
close_fds=True,
**kwargs
)
# get current p.stdout flags, add O_NONBLOCK
stdout_flags = fcntl.fcntl(process.stdout, fcntl.F_GETFL)
stderr_flags = fcntl.fcntl(process.stderr, fcntl.F_GETFL)
fcntl.fcntl(process.stdout, fcntl.F_SETFL, stdout_flags | os.O_NONBLOCK)
fcntl.fcntl(process.stderr, fcntl.F_SETFL, stderr_flags | os.O_NONBLOCK)
out = ''
err = ''
reads = None
stop = False
out_buffer = '' # partial line (no newline yet)
err_buffer = '' # partial line (no newline yet)
while not stop:
if reads and process.poll() is not None:
# we want to stop, but first read off anything remaining
# on stdout/stderr
stop = True
else:
reads, _, _ = select.select(
[process.stdout.fileno(), process.stderr.fileno()],
[], []
)
for fd in reads:
try:
message = os.read(fd, 1024)
if not isinstance(message, str):
message = message.decode('utf-8')
if fd == process.stdout.fileno():
out += message
message = out_buffer + message
lines = message.split('\n')
out_buffer = lines.pop()
for line in lines:
if verbose:
logger.info(desc + ':stdout ' + line)
else:
logger.debug(desc + ':stdout ' + line)
elif fd == process.stderr.fileno():
err += message
message = err_buffer + message
lines = message.split('\n')
err_buffer = lines.pop()
for line in lines:
if verbose:
logger.info(desc + ':stderr ' + line)
else:
logger.debug(desc + ':stderr ' + line)
else:
assert False
except (IOError, OSError):
pass
returncode = process.wait()
if out_buffer != '':
if verbose:
logger.info(desc + ':stdout ' + out_buffer)
else:
logger.debug(desc + ':stdout ' + out_buffer)
if err_buffer != '':
if verbose:
logger.info(desc + ':stderr ' + err_buffer)
else:
logger.debug(desc + ':stderr ' + err_buffer)
if returncode != 0 and verbose_on_failure and not verbose:
# dump stdout + stderr
logger.info('Non-zero exit code %d from %s' % (returncode, ' '.join(command)))
for line in out.splitlines():
logger.info(desc + ':stdout ' + line)
for line in err.splitlines():
logger.info(desc + ':stderr ' + line)
return out, err, returncode
def call_throws(command, **kwargs):
out, err, ret = call(command, **kwargs)
if ret:
raise RuntimeError('Failed command: %s' % ' '.join(command))
return out, err, ret
def makedirs(dir, uid, gid, mode):
# type: (str, int, int, int) -> None
if not os.path.exists(dir):
os.makedirs(dir, mode=mode)
else:
os.chmod(dir, mode)
os.chown(dir, uid, gid)
os.chmod(dir, mode) # the above is masked by umask...
if __name__ == '__main__':
container_path = '/usr/bin/podman'
if len(sys.argv) == 2:
if sys.argv[1] in ['add', 'remove']:
command_monitoring(action=sys.argv[1], mgrs='rh460p')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment