Last active
October 28, 2022 04:17
-
-
Save pcuzner/7677c226a9fc9c223a076c189db33b7b to your computer and use it in GitHub Desktop.
basic rgw-exporter to show radosgw-admin and prometheus-client
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# simple POC code to illustrate the integration of radosgw-admin and a prometheus | |
# client | |
# | |
# Requires: | |
# python3-jmespath | |
# python3-prometheus_client | |
# ceph-common installed (i.e. radosgw-admin command is available) | |
# ceph.conf configured | |
# Performance | |
# 4 buckets with radosgw-admin - .4s | |
# 996 buckets with radosgw-admin - 94s - list buckets 100ms, each bucket stats 90ms | |
# 4 buckets with http API - 20ms | |
# 996 buckets with http API - 1.6s - list_buckets 29ms, each bucket stats 2ms | |
# 996 buckets with asyncio http API calls - gave ~1.5s | |
# 996 buckets using threadpoolexecutor on 32 core machine - 0.7s consuming 1/2 core (peak) | |
# Running (with env variables) | |
# RGW_URL='http://my-rgw-server' ACCESS_KEY='accessKEY' SECRET_KEY='SECRETkey' BUCKETLIST='bucket1,bucket2,bucket3' ./rgw-exporter.py | |
import os | |
import time | |
import json | |
import logging | |
import jmespath # this is a separate rpm/package | |
import datetime | |
import subprocess | |
import hmac | |
import base64 | |
import concurrent.futures | |
from hashlib import sha1 | |
from urllib.request import Request, urlopen | |
from urllib.parse import urlencode, urlparse | |
from urllib.error import HTTPError | |
from typing import Dict, List, Optional, Union, Tuple, Any | |
from threading import Lock | |
# prometheus_client is a separate rpm/packages | |
from prometheus_client import start_http_server | |
from prometheus_client.core import GaugeMetricFamily, REGISTRY | |
logging.basicConfig( | |
filename='rgw-exporter.log', | |
format='%(asctime)s %(levelname)8s - %(message)s', | |
filemode='w', | |
level=logging.DEBUG) | |
logger = logging.getLogger() | |
def time_it(func): | |
def wrapped(*args, **kwargs): | |
start = time.perf_counter() | |
result = func(*args, **kwargs) | |
logger.debug(f"{func.__name__} elapsed time: {time.perf_counter() - start}s") | |
return result | |
return wrapped | |
def issue_command(cmd: str) -> Tuple[int, Union[List[str], Dict[str, str], str]]: | |
response = subprocess.run( | |
cmd.split(), | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE | |
) | |
if response.returncode == 0: | |
return 0, json.loads(response.stdout.decode('utf-8')) | |
err_msg = response.stderr.decode('utf-8') | |
logger.error(f"{cmd} failed. RC={response.returncode}. Error={err_msg}") | |
return response.returncode, err_msg | |
def get_now(): | |
tstamp = datetime.datetime.utcnow() | |
return f"{tstamp.strftime('%a, %d %b %Y %H:%M:%S')} +0000" | |
def gensig(now, path='/admin/bucket'): | |
# create a AWS4 signature for the request | |
string_to_sign = f"GET\n\n\n{now}\n{path}".encode('utf-8') | |
hashed = hmac.new(os.getenv('SECRET_KEY').encode('utf-8'), string_to_sign, sha1) | |
return base64.b64encode(hashed.digest()).decode('utf-8') | |
@time_it | |
def api_list_buckets() -> Tuple[int, Union[List[str], str]]: | |
url = os.getenv('RGW_URL') + '/admin/bucket' | |
return admin_ops_api(url) | |
def admin_ops_api(url: str) -> Tuple[int, Union[List[str], Dict[str, str], str]]: | |
now = get_now() | |
parsed_url = urlparse(url) | |
signature = gensig(now, parsed_url.path) | |
r = Request(url, | |
headers={ | |
"Accept": "application/json", | |
"Authorization": f"AWS {os.getenv('ACCESS_KEY')}:{signature}", | |
"Date": now | |
}, method='GET') | |
try: | |
response = urlopen(r) | |
except HTTPError as e: | |
err_msg = f"admin ops call failed to {url}. HTTP code: {e.code}, Reason: {e.reason}" | |
logger.error(err_msg) | |
return 1, err_msg | |
return 0, json.loads(response.read().decode('utf-8')) | |
def no_op(*args, **kwargs) -> Tuple[int, Union[List[str], Dict[str, str], str]]: | |
return 0, [] | |
@time_it | |
def api_get_user_info(user_id: str) -> Tuple[int, Union[Dict[str, str], str]]: | |
logger.debug('querying user ' + user_id) | |
params = { | |
"uid": user_id | |
} | |
url = os.getenv('RGW_URL') + '/admin/user' + '?' + urlencode(params) | |
return admin_ops_api(url) | |
@time_it | |
def api_get_bucket(bucket_name: str) -> Tuple[int, Union[Dict[str, str], str]]: | |
params = { | |
"bucket": bucket_name, | |
"stats": True | |
} | |
url = os.getenv('RGW_URL') + '/admin/bucket' + '?' + urlencode(params) | |
return admin_ops_api(url) | |
@time_it | |
def rgw_admin_get_bucket(bucket: str) -> Tuple[int, Union[Dict[str, str], str]]: | |
return issue_command(f"radosgw-admin bucket stats --bucket={bucket}") | |
@time_it | |
def rgw_admin_list_buckets() -> Tuple[int, Union[List[str], str]]: | |
return issue_command('radosgw-admin bucket list') | |
def split_list(items, chunk_size): | |
for i in range(0, len(items), chunk_size): | |
yield items[i:i + chunk_size] | |
class RGWCollector: | |
api_env_vars = ['ACCESS_KEY', 'SECRET_KEY', 'RGW_URL'] | |
def __init__(self, bucket_allowlist: str): | |
self.bucket_allowlist = bucket_allowlist | |
self.first_run = True | |
self._mode = None | |
self.group_size = 250 | |
self.metrics = { | |
'rgw_exporter_bucket_object_count': GaugeMetricFamily( | |
'rgw_exporter_bucket_object_count', | |
'Object count per bucket', | |
labels=['bucket'], | |
), | |
'rgw_exporter_bucket_usage_bytes': GaugeMetricFamily( | |
'rgw_exporter_bucket_usage_bytes', | |
'Capacity used by objects in the bucket', | |
labels=['bucket'], | |
), | |
'rgw_exporter_bucket_shard_count': GaugeMetricFamily( | |
'rgw_exporter_bucket_shard_count', | |
'The number of RADOS objects(shards) a bucket index is using', | |
labels=['bucket'], | |
), | |
'rgw_exporter_bucket_info': GaugeMetricFamily( | |
'rgw_exporter_bucket_info', | |
'Bucket metadata', | |
labels=['bucket', 'owner', 'index_type', 'placement_rule', 'id'], | |
), | |
'rgw_exporter_duration_seconds': GaugeMetricFamily( | |
'rgw_exporter_duration_seconds', | |
'Duration (secs) of the data gathering', | |
labels=[], | |
), | |
'rgw_exporter_user_info': GaugeMetricFamily( | |
'rgw_exporter_user_info', | |
'RGW User information', | |
labels=['user_id', 'display_name', 'suspended'], | |
), | |
'rgw_exporter_user_max_buckets_count': GaugeMetricFamily( | |
'rgw_exporter_user_max_buckets_count', | |
'maximum number a buckets a user can create', | |
labels=['user_id'], | |
), | |
} | |
# we'll use a lock to prevent multiple scrapes running at the same time | |
self.lock = Lock() | |
logger.info(f'Exporter running in {self.mode} mode') | |
if self.mode == 'api': | |
self.list_buckets_func = api_list_buckets | |
self.get_bucket_func = api_get_bucket | |
self.get_user_info_func = api_get_user_info | |
else: | |
self.list_buckets_func = rgw_admin_list_buckets | |
self.get_bucket_func = rgw_admin_get_bucket | |
self.get_user_info_func = no_op | |
@property | |
def mode(self): | |
if self._mode: | |
return self._mode | |
if all([env_var in os.environ for env_var in RGWCollector.api_env_vars]): | |
logger.debug(f'Environment variables for API mode present ({",".join(RGWCollector.api_env_vars)})') | |
self._mode = 'api' | |
else: | |
logger.debug('Using radosgw-admin commands') | |
self._mode = 'radosgw-admin' | |
return self._mode | |
def clear(self): | |
for metric in self.metrics: | |
self.metrics[metric].samples = [] | |
def collect(self): | |
# by default the collect method is called on start, which on a large environment | |
# may block for a few seconds before the process comes ready - so we skip the initial | |
# collect call. | |
if self.first_run: | |
self.first_run = False | |
return | |
logger.debug('collect starting- aquiring lock') | |
with self.lock: | |
logger.debug('collect started - lock aquired') | |
self.clear() | |
start = time.perf_counter() | |
err, bucket_data = self.list_buckets_func() | |
if self.bucket_allowlist: | |
filtered_buckets = [b for b in bucket_data if b in self.bucket_allowlist] | |
else: | |
filtered_buckets = bucket_data | |
# We could split the buckets up into separate processing groups and | |
# run them in parallel processes or threads, but this still won't | |
# scale to 100,000's buckets | |
groups = list(split_list(filtered_buckets, self.group_size)) | |
user_list = set() | |
if not err: | |
data = [] | |
with concurrent.futures.ThreadPoolExecutor() as executor: | |
future_stats = {executor.submit(api_get_bucket, bucket_name): bucket_name for bucket_name in filtered_buckets} | |
for future in concurrent.futures.as_completed(future_stats): | |
try: | |
result = future.result() | |
except Exception as e: | |
print("Ooops: " + str(e)) | |
data.append(result) | |
for err, bucket_stats in data: | |
if err: | |
continue | |
bucket_name = jmespath.search('bucket', bucket_stats) | |
num_objects = jmespath.search('usage.\"rgw.main".num_objects', bucket_stats) or 0 | |
size_actual = jmespath.search('usage.\"rgw.main".size_actual', bucket_stats) or 0 | |
num_shards = jmespath.search('num_shards', bucket_stats) | |
owner = jmespath.search('owner', bucket_stats) | |
user_list.add(owner) | |
placement_rule = jmespath.search('placement_rule', bucket_stats) | |
bucket_id = jmespath.search('id', bucket_stats) | |
index_type = jmespath.search('index_type', bucket_stats) | |
self.metrics['rgw_exporter_bucket_info'].add_metric( | |
[bucket_name, owner, index_type, placement_rule, bucket_id], 1) | |
if num_objects is not None: | |
self.metrics['rgw_exporter_bucket_object_count'].add_metric( | |
[bucket_name], num_objects) | |
if size_actual is not None: | |
self.metrics['rgw_exporter_bucket_usage_bytes'].add_metric( | |
[bucket_name], size_actual) | |
if num_shards is not None: | |
self.metrics['rgw_exporter_bucket_shard_count'].add_metric( | |
[bucket_name], num_shards) | |
# Could use another threadpool here, but processing serially for now | |
for user in user_list: | |
err, user_data = self.get_user_info_func(user) | |
if err: | |
logger.error('Error querying user information - possible lack of privileges?') | |
logger.error('Exporter skipping remaining user queries for this scrape request') | |
break | |
suspended = jmespath.search('suspended', user_data) | |
suspended_t = 'yes' if suspended else 'no' | |
max_buckets = jmespath.search('max_buckets', user_data) | |
display_name = jmespath.search('display_name', user_data) | |
self.metrics['rgw_exporter_user_info'].add_metric( | |
[user, display_name, suspended_t], 1) | |
self.metrics['rgw_exporter_user_max_buckets_count'].add_metric( | |
[user], max_buckets) | |
self.metrics['rgw_exporter_duration_seconds'].add_metric([], | |
(time.perf_counter() - start)) | |
for metric_name in self.metrics: | |
yield self.metrics[metric_name] | |
logger.debug('collect finished - lock released') | |
def main(): | |
default_port = 9198 | |
env_bucket_allowlist = os.getenv('BUCKETLIST') | |
if env_bucket_allowlist: | |
bucket_allowlist = env_bucket_allowlist.split(',') | |
else: | |
bucket_allowlist = [] | |
REGISTRY.register(RGWCollector(bucket_allowlist)) | |
try: | |
listening_port = int(os.getenv('PORT') or default_port) | |
except ValueError: | |
logger.error(f"Environment variable PORT is not an integer, using default ({default_port})") | |
listening_port = default_port | |
if bucket_allowlist: | |
logger.info('Bucket scanning limited to the following buckets:') | |
for b in bucket_allowlist: | |
logger.info(f'- {b}') | |
start_http_server(listening_port) | |
logger.info(f'rgw-exporter started, listening on port {listening_port}') | |
print("\nPress CTRL-C to shutdown") | |
try: | |
while True: | |
time.sleep(1) | |
except KeyboardInterrupt: | |
pass | |
logger.info('rgw-exporter shutdown') | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment