Skip to content

Instantly share code, notes, and snippets.

@jcsp
Created April 11, 2015 22:38
Show Gist options
  • Save jcsp/00bebd44521ebfc4f9c4 to your computer and use it in GitHub Desktop.
Save jcsp/00bebd44521ebfc4f9c4 to your computer and use it in GitHub Desktop.
# License: LGPL2.1 or later
from glob import glob
import os
import re
import socket
import subprocess
import struct
import json
import requests
import sys
# FIXME: We probably can't assume that <clustername>.client.admin.keyring is always
# present, although this is the case on a nicely ceph-deploy'd system
RADOS_NAME = 'client.admin'
class MonitoringError(Exception):
pass
class RadosError(MonitoringError):
"""
Something went wrong talking to Ceph with librados
"""
pass
class AdminSocketError(MonitoringError):
"""
Something went wrong talking to Ceph with a /var/run/ceph socket.
"""
pass
# This function borrowed from /usr/bin/ceph: we should
# get ceph's python code into site-packages so that we
# can borrow things like this.
def admin_socket(asok_path, cmd, fmt=''):
"""
Send a daemon (--admin-daemon) command 'cmd'. asok_path is the
path to the admin socket; cmd is a list of strings
"""
from ceph_argparse import parse_json_funcsigs, validate_command
def do_sockio(path, cmd):
""" helper: do all the actual low-level stream I/O """
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(path)
try:
sock.sendall(cmd + '\0')
len_str = sock.recv(4)
if len(len_str) < 4:
raise RuntimeError("no data returned from admin socket")
l, = struct.unpack(">I", len_str)
ret = ''
got = 0
while got < l:
bit = sock.recv(l - got)
ret += bit
got += len(bit)
except Exception as e:
raise AdminSocketError('exception: ' + str(e))
return ret
try:
cmd_json = do_sockio(asok_path,
json.dumps({"prefix": "get_command_descriptions"}))
except Exception as e:
raise AdminSocketError('exception getting command descriptions: ' + str(e))
if cmd == 'get_command_descriptions':
return cmd_json
sigdict = parse_json_funcsigs(cmd_json, 'cli')
valid_dict = validate_command(sigdict, cmd)
if not valid_dict:
raise AdminSocketError('invalid command')
if fmt:
valid_dict['format'] = fmt
try:
ret = do_sockio(asok_path, json.dumps(valid_dict))
except Exception as e:
raise AdminSocketError('exception: ' + str(e))
return ret
def ceph_command(cluster_name, command_args):
"""
Run a Ceph CLI operation directly. This is a fallback to allow
manual execution of arbitrary commands in case the user wants to
do something that is absent or broken in Calamari proper.
:param cluster_name: Ceph cluster name, or None to run without --cluster argument
:param command_args: Command line, excluding the leading 'ceph' part.
"""
if cluster_name:
args = ["ceph", "--cluster", cluster_name] + command_args
else:
args = ["ceph"] + command_args
p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
status = p.returncode
return {
'out': stdout,
'err': stderr,
'status': status
}
def _get_config(cluster_name):
"""
Given that a mon is running on this server, query its admin socket to get
the configuration dict.
:return JSON-encoded config object
"""
try:
mon_socket = glob("/var/run/ceph/{cluster_name}-mon.*.asok".format(cluster_name=cluster_name))[0]
except IndexError:
raise AdminSocketError("Cannot find mon socket for %s" % cluster_name)
config_response = admin_socket(mon_socket, ['config', 'show'], 'json')
return config_response
def get_services():
try:
import rados
except ImportError:
# Ceph isn't installed, report no services
return {}
# Map of FSID to path string string
mon_sockets = {}
# FSID string to cluster name string
fsid_names = {}
# Service name to service dict
services = {}
# For each admin socket, try to interrogate the service
for filename in glob("/var/run/ceph/*.asok"):
try:
service_data = service_status(filename)
except (rados.Error, MonitoringError):
# Failed to get info for this service, stale socket or unresponsive,
# exclude it from report
pass
else:
if not service_data:
pass
service_name = (service_data['cluster'], service_data['type'], service_data['id'])
services[service_name] = service_data
fsid_names[service_data['fsid']] = service_data['cluster']
if service_data['type'] == 'mon' and service_data['status']['rank'] in service_data['status']['quorum']:
# A mon in quorum is elegible to emit a cluster heartbeat
mon_sockets[service_data['fsid']] = filename
return services
def service_status(socket_path):
"""
Given an admin socket path, learn all we can about that service
"""
try:
cluster_name, service_type, service_id = \
re.match("^(.+?)-(.+?)\.(.+)\.asok$", os.path.basename(socket_path)).groups()
except AttributeError:
return None
status = None
# Interrogate the service for its FSID
if service_type != 'mon':
try:
fsid = json.loads(admin_socket(socket_path, ['status'], 'json'))['cluster_fsid']
except AdminSocketError:
# older osd/mds daemons don't support 'status'; try our best
config = json.loads(admin_socket(socket_path, ['config', 'get', 'fsid'], 'json'))
fsid = config['fsid']
else:
# For mons, we send some extra info here, because if they're out
# of quorum we may not find out from the cluster heartbeats, so
# need to use the service heartbeats to detect that.
status = json.loads(admin_socket(socket_path, ['mon_status'], 'json'))
fsid = status['monmap']['fsid']
version_response = admin_socket(socket_path, ['version'], 'json')
if version_response is not None:
service_version = json.loads(version_response)['version']
else:
service_version = None
return {
'cluster': cluster_name,
'type': service_type,
'id': service_id,
'fsid': fsid,
'status': status,
'version': service_version
}
def announce():
"""
Send an event to the master with the terse status
"""
for service_id, service_info in get_services().items():
cluster, svc_type, svc_id = service_id
consul_service = {
"name": "ceph-{0}".format(svc_type),
"id": "{0}-{1}.{2}".format(cluster, svc_type, svc_id),
"tags": [],
"checks": [
{
"script": "ceph daemon {0}.{1} help".format(svc_type, svc_id),
"interval": "10s"
}
]
}
response = requests.post("http://localhost:8500/v1/agent/service/register", data=
json.dumps(consul_service))
print consul_service['id'], response.status_code
assert response.status_code == 200
def report():
r = requests.get("http://localhost:8500/v1/catalog/nodes")
assert r.status_code == 200
for node in r.json():
print node['Node']
status = requests.get("http://localhost:8500/v1/catalog/node/{0}".format(node['Node'])).json()
checks = requests.get("http://localhost:8500/v1/health/node/{0}".format(node['Node'])).json()
checks_by_service = {}
for check in checks:
checks_by_service[check['ServiceID']] = check
for service_id, service_status in status['Services'].items():
health = checks_by_service[service_id]['Status'] if service_id in checks_by_service else ""
if not health:
continue
print " ", service_id, health
# Hey, let's also announce SCSI devices here and give them a "smartctl -a <dev> | grep PASSED" health check
if sys.argv[1] == 'announce':
# This is a hook to be run on boot/config changes/service starts to update consuls view of ceph
# TODO: don't POST a service that isn't being modified: each time we POST it, its checks get reset to critical
# (fix this in Consul?)
# TODO: also announce non-running services (i.e. MDSs from /var/lib, OSDs from their parition IDs)
# TODO: also remove any services that don't exist any more
announce()
elif sys.argv[1] == 'report':
report()
else:
raise NotImplementedError(sys.argv[1])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment