Created
April 11, 2015 22:38
-
-
Save jcsp/00bebd44521ebfc4f9c4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# License: LGPL2.1 or later | |
from glob import glob | |
import os | |
import re | |
import socket | |
import subprocess | |
import struct | |
import json | |
import requests | |
import sys | |
# FIXME: We probably can't assume that <clustername>.client.admin.keyring is always | |
# present, although this is the case on a nicely ceph-deploy'd system | |
RADOS_NAME = 'client.admin' | |
class MonitoringError(Exception): | |
pass | |
class RadosError(MonitoringError): | |
""" | |
Something went wrong talking to Ceph with librados | |
""" | |
pass | |
class AdminSocketError(MonitoringError): | |
""" | |
Something went wrong talking to Ceph with a /var/run/ceph socket. | |
""" | |
pass | |
# This function borrowed from /usr/bin/ceph: we should | |
# get ceph's python code into site-packages so that we | |
# can borrow things like this. | |
def admin_socket(asok_path, cmd, fmt=''): | |
""" | |
Send a daemon (--admin-daemon) command 'cmd'. asok_path is the | |
path to the admin socket; cmd is a list of strings | |
""" | |
from ceph_argparse import parse_json_funcsigs, validate_command | |
def do_sockio(path, cmd): | |
""" helper: do all the actual low-level stream I/O """ | |
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | |
sock.connect(path) | |
try: | |
sock.sendall(cmd + '\0') | |
len_str = sock.recv(4) | |
if len(len_str) < 4: | |
raise RuntimeError("no data returned from admin socket") | |
l, = struct.unpack(">I", len_str) | |
ret = '' | |
got = 0 | |
while got < l: | |
bit = sock.recv(l - got) | |
ret += bit | |
got += len(bit) | |
except Exception as e: | |
raise AdminSocketError('exception: ' + str(e)) | |
return ret | |
try: | |
cmd_json = do_sockio(asok_path, | |
json.dumps({"prefix": "get_command_descriptions"})) | |
except Exception as e: | |
raise AdminSocketError('exception getting command descriptions: ' + str(e)) | |
if cmd == 'get_command_descriptions': | |
return cmd_json | |
sigdict = parse_json_funcsigs(cmd_json, 'cli') | |
valid_dict = validate_command(sigdict, cmd) | |
if not valid_dict: | |
raise AdminSocketError('invalid command') | |
if fmt: | |
valid_dict['format'] = fmt | |
try: | |
ret = do_sockio(asok_path, json.dumps(valid_dict)) | |
except Exception as e: | |
raise AdminSocketError('exception: ' + str(e)) | |
return ret | |
def ceph_command(cluster_name, command_args): | |
""" | |
Run a Ceph CLI operation directly. This is a fallback to allow | |
manual execution of arbitrary commands in case the user wants to | |
do something that is absent or broken in Calamari proper. | |
:param cluster_name: Ceph cluster name, or None to run without --cluster argument | |
:param command_args: Command line, excluding the leading 'ceph' part. | |
""" | |
if cluster_name: | |
args = ["ceph", "--cluster", cluster_name] + command_args | |
else: | |
args = ["ceph"] + command_args | |
p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
stdout, stderr = p.communicate() | |
status = p.returncode | |
return { | |
'out': stdout, | |
'err': stderr, | |
'status': status | |
} | |
def _get_config(cluster_name): | |
""" | |
Given that a mon is running on this server, query its admin socket to get | |
the configuration dict. | |
:return JSON-encoded config object | |
""" | |
try: | |
mon_socket = glob("/var/run/ceph/{cluster_name}-mon.*.asok".format(cluster_name=cluster_name))[0] | |
except IndexError: | |
raise AdminSocketError("Cannot find mon socket for %s" % cluster_name) | |
config_response = admin_socket(mon_socket, ['config', 'show'], 'json') | |
return config_response | |
def get_services(): | |
try: | |
import rados | |
except ImportError: | |
# Ceph isn't installed, report no services | |
return {} | |
# Map of FSID to path string string | |
mon_sockets = {} | |
# FSID string to cluster name string | |
fsid_names = {} | |
# Service name to service dict | |
services = {} | |
# For each admin socket, try to interrogate the service | |
for filename in glob("/var/run/ceph/*.asok"): | |
try: | |
service_data = service_status(filename) | |
except (rados.Error, MonitoringError): | |
# Failed to get info for this service, stale socket or unresponsive, | |
# exclude it from report | |
pass | |
else: | |
if not service_data: | |
pass | |
service_name = (service_data['cluster'], service_data['type'], service_data['id']) | |
services[service_name] = service_data | |
fsid_names[service_data['fsid']] = service_data['cluster'] | |
if service_data['type'] == 'mon' and service_data['status']['rank'] in service_data['status']['quorum']: | |
# A mon in quorum is elegible to emit a cluster heartbeat | |
mon_sockets[service_data['fsid']] = filename | |
return services | |
def service_status(socket_path): | |
""" | |
Given an admin socket path, learn all we can about that service | |
""" | |
try: | |
cluster_name, service_type, service_id = \ | |
re.match("^(.+?)-(.+?)\.(.+)\.asok$", os.path.basename(socket_path)).groups() | |
except AttributeError: | |
return None | |
status = None | |
# Interrogate the service for its FSID | |
if service_type != 'mon': | |
try: | |
fsid = json.loads(admin_socket(socket_path, ['status'], 'json'))['cluster_fsid'] | |
except AdminSocketError: | |
# older osd/mds daemons don't support 'status'; try our best | |
config = json.loads(admin_socket(socket_path, ['config', 'get', 'fsid'], 'json')) | |
fsid = config['fsid'] | |
else: | |
# For mons, we send some extra info here, because if they're out | |
# of quorum we may not find out from the cluster heartbeats, so | |
# need to use the service heartbeats to detect that. | |
status = json.loads(admin_socket(socket_path, ['mon_status'], 'json')) | |
fsid = status['monmap']['fsid'] | |
version_response = admin_socket(socket_path, ['version'], 'json') | |
if version_response is not None: | |
service_version = json.loads(version_response)['version'] | |
else: | |
service_version = None | |
return { | |
'cluster': cluster_name, | |
'type': service_type, | |
'id': service_id, | |
'fsid': fsid, | |
'status': status, | |
'version': service_version | |
} | |
def announce(): | |
""" | |
Send an event to the master with the terse status | |
""" | |
for service_id, service_info in get_services().items(): | |
cluster, svc_type, svc_id = service_id | |
consul_service = { | |
"name": "ceph-{0}".format(svc_type), | |
"id": "{0}-{1}.{2}".format(cluster, svc_type, svc_id), | |
"tags": [], | |
"checks": [ | |
{ | |
"script": "ceph daemon {0}.{1} help".format(svc_type, svc_id), | |
"interval": "10s" | |
} | |
] | |
} | |
response = requests.post("http://localhost:8500/v1/agent/service/register", data= | |
json.dumps(consul_service)) | |
print consul_service['id'], response.status_code | |
assert response.status_code == 200 | |
def report(): | |
r = requests.get("http://localhost:8500/v1/catalog/nodes") | |
assert r.status_code == 200 | |
for node in r.json(): | |
print node['Node'] | |
status = requests.get("http://localhost:8500/v1/catalog/node/{0}".format(node['Node'])).json() | |
checks = requests.get("http://localhost:8500/v1/health/node/{0}".format(node['Node'])).json() | |
checks_by_service = {} | |
for check in checks: | |
checks_by_service[check['ServiceID']] = check | |
for service_id, service_status in status['Services'].items(): | |
health = checks_by_service[service_id]['Status'] if service_id in checks_by_service else "" | |
if not health: | |
continue | |
print " ", service_id, health | |
# Hey, let's also announce SCSI devices here and give them a "smartctl -a <dev> | grep PASSED" health check | |
if sys.argv[1] == 'announce': | |
# This is a hook to be run on boot/config changes/service starts to update consuls view of ceph | |
# TODO: don't POST a service that isn't being modified: each time we POST it, its checks get reset to critical | |
# (fix this in Consul?) | |
# TODO: also announce non-running services (i.e. MDSs from /var/lib, OSDs from their parition IDs) | |
# TODO: also remove any services that don't exist any more | |
announce() | |
elif sys.argv[1] == 'report': | |
report() | |
else: | |
raise NotImplementedError(sys.argv[1]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment