Skip to content

Instantly share code, notes, and snippets.

@pcuzner
Last active September 8, 2022 05:14
Show Gist options
  • Save pcuzner/4622aa9b66164c974da7c27b157d9682 to your computer and use it in GitHub Desktop.
Save pcuzner/4622aa9b66164c974da7c27b157d9682 to your computer and use it in GitHub Desktop.
Example exporter using the prometheus client to provide rbd-mirror metrics
#!/usr/bin/env/python3
import threading
from prometheus_client import start_http_server
from prometheus_client.core import GaugeMetricFamily, REGISTRY
import os
import subprocess
import json
import time
import enum
from threading import Lock
import logging
# BUGS
# L228 mirror_image_lookup[mirror_state] ... can have a key error - "state not found", when there no rbd images being replicated
# Same line - when rbd is bootstrapping the state is 'bootstrapping, IMAGE_SYNC/NOTIFY_SYNC_REQUEST'q
logging.basicConfig(
filename='rbd-mirror-exporter.log',
filemode='w',
level=logging.DEBUG)
logger = logging.getLogger()
# Example code based on a two cluster snapshot based rbd mirror.
# NB. rbd-mirror can replicate to multiple targets
# this command defaults to the rbd pool, you need --pool pool for others
rbd_command = 'rbd mirror pool status --verbose --format json'
# {"summary":{"health":"OK","daemon_health":"OK","image_health":"OK","states":{"replaying":20}}
class RBDData:
def __init__(self):
self.raw_json = {}
self.collect_time = 0
self.lock = Lock()
def calculate_interval():
# rbd-mirror updates state every 30s
return 30
def issue_command(cmd: str):
return subprocess.run(
cmd.split(),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
def query_rbd(interval: int, rbd_data: RBDData):
while True:
st = time.time()
logger.debug('aqcuiring rbd stats object lock')
with rbd_data.lock:
logger.debug('aquired lock on stats object')
rbd_data.raw_json.clear()
rbd_mirror_cmd = issue_command(rbd_command)
if rbd_mirror_cmd.returncode == 0:
rbd_data.raw_json = json.loads(rbd_mirror_cmd.stdout)
else:
logger.error(f'rbd command failed : {rbd_mirror_cmd.returncode}')
logger.error(f'rbd command stdout : {rbd_mirror_cmd.stdout}')
logger.error(f'rbd command stderr : {rbd_mirror_cmd.stderr}')
logger.debug('released rbd stats object lock')
rbd_data.collect_time = time.time() - st
logger.info(f'rbd command elapsed time : {rbd_data.collect_time}')
logger.debug(f'sleeping for {interval}s')
time.sleep(interval)
mirror_image_lookup = {
'unknown': 0,
'error': 1,
'syncing': 2,
'starting_replay': 3,
'replaying': 4,
'stopping_replay': 5,
'stopped': 6,
'split-brain': 7
}
class MirrorImageStatusState(enum.Enum):
# from cls_rbd_types.h
unknown = 0
error = 1
syncing = 2
starting_replay = 3
replaying = 4
stopping_replay = 5
stopped = 6
class MirrorHealth(enum.Enum):
# from MirrorDaemonServiceInfo.cc
OK = 0
UNKNOWN = 1
WARNING = 2
ERROR = 3
class RBDMirrorCollector:
def __init__(self, rbd_data: RBDData):
self.rbd_data = rbd_data
self.metrics = {
'ceph_rbd_mirror_image_metadata': GaugeMetricFamily(
'ceph_rbd_mirror_image_metadata',
'rbd-mirror image metadata',
labels=['pool', 'namespace', 'image', 'state', 'role']
),
'ceph_rbd_mirror_image_state': GaugeMetricFamily(
'ceph_rbd_mirror_image_state',
'rbd-mirror image status by site',
labels=['pool', 'namespace', 'image', 'site_name']
),
'ceph_rbd_mirror_image_snapshot_size_bytes': GaugeMetricFamily(
'ceph_rbd_mirror_image_snapshot_size_bytes',
'Size of the last rbd-mirror snapshot (bytes)',
labels=['pool', 'namespace', 'image', 'site_name']
),
'ceph_rbd_mirror_image_timestamp': GaugeMetricFamily(
'ceph_rbd_mirror_image_timestamp',
'EPOCH time since last successful snapshot on remote (int)',
labels=['pool', 'namespace', 'image', 'site_name']
),
'ceph_rbd_mirror_daemon_metadata': GaugeMetricFamily(
'ceph_rbd_mirror_daemon_metadata',
'rbd-mirror daemon configuration',
labels=['hostname', 'ceph_version', 'leader', 'client_id']
),
'ceph_rbd_mirror_collect_seconds': GaugeMetricFamily(
'ceph_rbd_mirror_collect_seconds',
'time taken to gather rbd mirror data from ceph (secs)'
),
'ceph_rbd_mirror_daemon_health': GaugeMetricFamily(
'ceph_rbd_mirror_daemon_health',
'Overall daemon health'
),
'ceph_rbd_mirror_image_health': GaugeMetricFamily(
'ceph_rbd_mirror_image_health',
'Overall image health'
),
'ceph_rbd_mirror_health': GaugeMetricFamily(
'ceph_rbd_mirror_health',
'Overall health of rbd mirror environment'
)
}
def clear(self):
for metric in self.metrics:
self.metrics[metric].samples = []
def collect(self):
# hack for POC
pool = 'rbd'
namespace = ''
logger.debug('collect - aquiring lock on rbd object')
with self.rbd_data.lock:
logger.debug('collect - lock aquired')
self.clear()
self.metrics['ceph_rbd_mirror_collect_seconds'].add_metric([],
self.rbd_data.collect_time) # noqa: E128
summary = self.rbd_data.raw_json.get('summary', {})
if summary:
self.metrics['ceph_rbd_mirror_daemon_health'].add_metric([],
MirrorHealth[summary.get('daemon_health', 'UNKNOWN')].value) # noqa: E501, E128
self.metrics['ceph_rbd_mirror_image_health'].add_metric([],
MirrorHealth[summary.get('image_health', 'UNKNOWN')].value) # noqa: E501, E128
self.metrics['ceph_rbd_mirror_health'].add_metric([],
MirrorHealth[summary.get('health', 'UNKNOWN')].value) # noqa: E501, E128
# process the daemon information
for daemon in self.rbd_data.raw_json.get('daemons', []):
if daemon['health'] == 'OK':
state = 0
else:
state = 1
self.metrics['ceph_rbd_mirror_daemon_metadata'].add_metric([
daemon.get('hostname', ''),
daemon.get('ceph_version', ''),
str(daemon.get('leader', '')),
daemon.get('client_id', '')], 1)
# process each image
for image in self.rbd_data.raw_json.get('images', []):
image_name = image.get('name', '')
# image_state = image.get('state', 'unknown')
if image.get('description') == 'local image is primary':
image_role = 'primary'
else:
image_role = 'secondary'
assert image_name
peer_info = image.get('peer_sites', [])
for peer in peer_info:
desc = peer.get('description', '')
if '{' in desc:
# this image from the secondary site
mirror_state, json_str = desc.split(', ')
else:
mirror_state = desc
json_str = '{}'
try:
status = json.loads(json_str)
except json.JSONDecodeError:
logger.error(f'unable to decode description field of {pool}/{image_name}')
continue
self.metrics['ceph_rbd_mirror_image_metadata'].add_metric([
pool,
namespace,
image_name,
mirror_state,
image_role
], 1)
self.metrics['ceph_rbd_mirror_image_state'].add_metric([
pool,
namespace,
image_name,
peer.get('site_name')
], mirror_image_lookup[mirror_state])
self.metrics['ceph_rbd_mirror_image_snapshot_size_bytes'].add_metric([ # noqa: E501
pool,
namespace,
image_name,
peer.get('site_name')
], status.get('bytes_per_snapshot', 0))
self.metrics['ceph_rbd_mirror_image_timestamp'].add_metric([ # noqa: E501
pool,
namespace,
image_name,
peer.get('site_name')
], status.get('remote_snapshot_timestamp', 0))
for metric_name in self.metrics:
yield self.metrics[metric_name]
logger.debug('collect - released lock')
def main():
default_port = 8089
interval = calculate_interval()
rbd_data = RBDData()
rbd_query = threading.Thread(target=query_rbd, args=(interval, rbd_data))
rbd_query.daemon = True
rbd_query.start()
REGISTRY.register(RBDMirrorCollector(rbd_data))
listening_port = os.getenv('PORT')
if listening_port:
try:
listening_port = int(listening_port)
except ValueError:
listening_port = default_port
else:
listening_port = default_port
start_http_server(listening_port)
logger.info("exporter started")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
pass
logger.info('exporter shutdown')
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment