Skip to content

Instantly share code, notes, and snippets.

@pcuzner
Last active October 28, 2022 04:17
Show Gist options
  • Save pcuzner/7677c226a9fc9c223a076c189db33b7b to your computer and use it in GitHub Desktop.
Save pcuzner/7677c226a9fc9c223a076c189db33b7b to your computer and use it in GitHub Desktop.
basic rgw-exporter to show radosgw-admin and prometheus-client
#!/usr/bin/env python3
# simple POC code to illustrate the integration of radosgw-admin and a prometheus
# client
#
# Requires:
# python3-jmespath
# python3-prometheus_client
# ceph-common installed (i.e. radosgw-admin command is available)
# ceph.conf configured
# Performance
# 4 buckets with radosgw-admin - .4s
# 996 buckets with radosgw-admin - 94s - list buckets 100ms, each bucket stats 90ms
# 4 buckets with http API - 20ms
# 996 buckets with http API - 1.6s - list_buckets 29ms, each bucket stats 2ms
# 996 buckets with asyncio http API calls - gave ~1.5s
# 996 buckets using threadpoolexecutor on 32 core machine - 0.7s consuming 1/2 core (peak)
# Running (with env variables)
# RGW_URL='http://my-rgw-server' ACCESS_KEY='accessKEY' SECRET_KEY='SECRETkey' BUCKETLIST='bucket1,bucket2,bucket3' ./rgw-exporter.py
import os
import time
import json
import logging
import jmespath # this is a separate rpm/package
import datetime
import subprocess
import hmac
import base64
import concurrent.futures
from hashlib import sha1
from urllib.request import Request, urlopen
from urllib.parse import urlencode, urlparse
from urllib.error import HTTPError
from typing import Dict, List, Optional, Union, Tuple, Any
from threading import Lock
# prometheus_client is a separate rpm/packages
from prometheus_client import start_http_server
from prometheus_client.core import GaugeMetricFamily, REGISTRY
logging.basicConfig(
filename='rgw-exporter.log',
format='%(asctime)s %(levelname)8s - %(message)s',
filemode='w',
level=logging.DEBUG)
logger = logging.getLogger()
def time_it(func):
def wrapped(*args, **kwargs):
start = time.perf_counter()
result = func(*args, **kwargs)
logger.debug(f"{func.__name__} elapsed time: {time.perf_counter() - start}s")
return result
return wrapped
def issue_command(cmd: str) -> Tuple[int, Union[List[str], Dict[str, str], str]]:
response = subprocess.run(
cmd.split(),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
if response.returncode == 0:
return 0, json.loads(response.stdout.decode('utf-8'))
err_msg = response.stderr.decode('utf-8')
logger.error(f"{cmd} failed. RC={response.returncode}. Error={err_msg}")
return response.returncode, err_msg
def get_now():
tstamp = datetime.datetime.utcnow()
return f"{tstamp.strftime('%a, %d %b %Y %H:%M:%S')} +0000"
def gensig(now, path='/admin/bucket'):
# create a AWS4 signature for the request
string_to_sign = f"GET\n\n\n{now}\n{path}".encode('utf-8')
hashed = hmac.new(os.getenv('SECRET_KEY').encode('utf-8'), string_to_sign, sha1)
return base64.b64encode(hashed.digest()).decode('utf-8')
@time_it
def api_list_buckets() -> Tuple[int, Union[List[str], str]]:
url = os.getenv('RGW_URL') + '/admin/bucket'
return admin_ops_api(url)
def admin_ops_api(url: str) -> Tuple[int, Union[List[str], Dict[str, str], str]]:
now = get_now()
parsed_url = urlparse(url)
signature = gensig(now, parsed_url.path)
r = Request(url,
headers={
"Accept": "application/json",
"Authorization": f"AWS {os.getenv('ACCESS_KEY')}:{signature}",
"Date": now
}, method='GET')
try:
response = urlopen(r)
except HTTPError as e:
err_msg = f"admin ops call failed to {url}. HTTP code: {e.code}, Reason: {e.reason}"
logger.error(err_msg)
return 1, err_msg
return 0, json.loads(response.read().decode('utf-8'))
def no_op(*args, **kwargs) -> Tuple[int, Union[List[str], Dict[str, str], str]]:
return 0, []
@time_it
def api_get_user_info(user_id: str) -> Tuple[int, Union[Dict[str, str], str]]:
logger.debug('querying user ' + user_id)
params = {
"uid": user_id
}
url = os.getenv('RGW_URL') + '/admin/user' + '?' + urlencode(params)
return admin_ops_api(url)
@time_it
def api_get_bucket(bucket_name: str) -> Tuple[int, Union[Dict[str, str], str]]:
params = {
"bucket": bucket_name,
"stats": True
}
url = os.getenv('RGW_URL') + '/admin/bucket' + '?' + urlencode(params)
return admin_ops_api(url)
@time_it
def rgw_admin_get_bucket(bucket: str) -> Tuple[int, Union[Dict[str, str], str]]:
return issue_command(f"radosgw-admin bucket stats --bucket={bucket}")
@time_it
def rgw_admin_list_buckets() -> Tuple[int, Union[List[str], str]]:
return issue_command('radosgw-admin bucket list')
def split_list(items, chunk_size):
for i in range(0, len(items), chunk_size):
yield items[i:i + chunk_size]
class RGWCollector:
api_env_vars = ['ACCESS_KEY', 'SECRET_KEY', 'RGW_URL']
def __init__(self, bucket_allowlist: str):
self.bucket_allowlist = bucket_allowlist
self.first_run = True
self._mode = None
self.group_size = 250
self.metrics = {
'rgw_exporter_bucket_object_count': GaugeMetricFamily(
'rgw_exporter_bucket_object_count',
'Object count per bucket',
labels=['bucket'],
),
'rgw_exporter_bucket_usage_bytes': GaugeMetricFamily(
'rgw_exporter_bucket_usage_bytes',
'Capacity used by objects in the bucket',
labels=['bucket'],
),
'rgw_exporter_bucket_shard_count': GaugeMetricFamily(
'rgw_exporter_bucket_shard_count',
'The number of RADOS objects(shards) a bucket index is using',
labels=['bucket'],
),
'rgw_exporter_bucket_info': GaugeMetricFamily(
'rgw_exporter_bucket_info',
'Bucket metadata',
labels=['bucket', 'owner', 'index_type', 'placement_rule', 'id'],
),
'rgw_exporter_duration_seconds': GaugeMetricFamily(
'rgw_exporter_duration_seconds',
'Duration (secs) of the data gathering',
labels=[],
),
'rgw_exporter_user_info': GaugeMetricFamily(
'rgw_exporter_user_info',
'RGW User information',
labels=['user_id', 'display_name', 'suspended'],
),
'rgw_exporter_user_max_buckets_count': GaugeMetricFamily(
'rgw_exporter_user_max_buckets_count',
'maximum number a buckets a user can create',
labels=['user_id'],
),
}
# we'll use a lock to prevent multiple scrapes running at the same time
self.lock = Lock()
logger.info(f'Exporter running in {self.mode} mode')
if self.mode == 'api':
self.list_buckets_func = api_list_buckets
self.get_bucket_func = api_get_bucket
self.get_user_info_func = api_get_user_info
else:
self.list_buckets_func = rgw_admin_list_buckets
self.get_bucket_func = rgw_admin_get_bucket
self.get_user_info_func = no_op
@property
def mode(self):
if self._mode:
return self._mode
if all([env_var in os.environ for env_var in RGWCollector.api_env_vars]):
logger.debug(f'Environment variables for API mode present ({",".join(RGWCollector.api_env_vars)})')
self._mode = 'api'
else:
logger.debug('Using radosgw-admin commands')
self._mode = 'radosgw-admin'
return self._mode
def clear(self):
for metric in self.metrics:
self.metrics[metric].samples = []
def collect(self):
# by default the collect method is called on start, which on a large environment
# may block for a few seconds before the process comes ready - so we skip the initial
# collect call.
if self.first_run:
self.first_run = False
return
logger.debug('collect starting- aquiring lock')
with self.lock:
logger.debug('collect started - lock aquired')
self.clear()
start = time.perf_counter()
err, bucket_data = self.list_buckets_func()
if self.bucket_allowlist:
filtered_buckets = [b for b in bucket_data if b in self.bucket_allowlist]
else:
filtered_buckets = bucket_data
# We could split the buckets up into separate processing groups and
# run them in parallel processes or threads, but this still won't
# scale to 100,000's buckets
groups = list(split_list(filtered_buckets, self.group_size))
user_list = set()
if not err:
data = []
with concurrent.futures.ThreadPoolExecutor() as executor:
future_stats = {executor.submit(api_get_bucket, bucket_name): bucket_name for bucket_name in filtered_buckets}
for future in concurrent.futures.as_completed(future_stats):
try:
result = future.result()
except Exception as e:
print("Ooops: " + str(e))
data.append(result)
for err, bucket_stats in data:
if err:
continue
bucket_name = jmespath.search('bucket', bucket_stats)
num_objects = jmespath.search('usage.\"rgw.main".num_objects', bucket_stats) or 0
size_actual = jmespath.search('usage.\"rgw.main".size_actual', bucket_stats) or 0
num_shards = jmespath.search('num_shards', bucket_stats)
owner = jmespath.search('owner', bucket_stats)
user_list.add(owner)
placement_rule = jmespath.search('placement_rule', bucket_stats)
bucket_id = jmespath.search('id', bucket_stats)
index_type = jmespath.search('index_type', bucket_stats)
self.metrics['rgw_exporter_bucket_info'].add_metric(
[bucket_name, owner, index_type, placement_rule, bucket_id], 1)
if num_objects is not None:
self.metrics['rgw_exporter_bucket_object_count'].add_metric(
[bucket_name], num_objects)
if size_actual is not None:
self.metrics['rgw_exporter_bucket_usage_bytes'].add_metric(
[bucket_name], size_actual)
if num_shards is not None:
self.metrics['rgw_exporter_bucket_shard_count'].add_metric(
[bucket_name], num_shards)
# Could use another threadpool here, but processing serially for now
for user in user_list:
err, user_data = self.get_user_info_func(user)
if err:
logger.error('Error querying user information - possible lack of privileges?')
logger.error('Exporter skipping remaining user queries for this scrape request')
break
suspended = jmespath.search('suspended', user_data)
suspended_t = 'yes' if suspended else 'no'
max_buckets = jmespath.search('max_buckets', user_data)
display_name = jmespath.search('display_name', user_data)
self.metrics['rgw_exporter_user_info'].add_metric(
[user, display_name, suspended_t], 1)
self.metrics['rgw_exporter_user_max_buckets_count'].add_metric(
[user], max_buckets)
self.metrics['rgw_exporter_duration_seconds'].add_metric([],
(time.perf_counter() - start))
for metric_name in self.metrics:
yield self.metrics[metric_name]
logger.debug('collect finished - lock released')
def main():
default_port = 9198
env_bucket_allowlist = os.getenv('BUCKETLIST')
if env_bucket_allowlist:
bucket_allowlist = env_bucket_allowlist.split(',')
else:
bucket_allowlist = []
REGISTRY.register(RGWCollector(bucket_allowlist))
try:
listening_port = int(os.getenv('PORT') or default_port)
except ValueError:
logger.error(f"Environment variable PORT is not an integer, using default ({default_port})")
listening_port = default_port
if bucket_allowlist:
logger.info('Bucket scanning limited to the following buckets:')
for b in bucket_allowlist:
logger.info(f'- {b}')
start_http_server(listening_port)
logger.info(f'rgw-exporter started, listening on port {listening_port}')
print("\nPress CTRL-C to shutdown")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
pass
logger.info('rgw-exporter shutdown')
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment