Skip to content

Instantly share code, notes, and snippets.

@wido
Last active May 6, 2022 21:14
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save wido/fa97b735eb96ea459036bb31a989a45e to your computer and use it in GitHub Desktop.
Save wido/fa97b735eb96ea459036bb31a989a45e to your computer and use it in GitHub Desktop.
Measure Ceph RADOS latency and send to InfluxDB
#!/usr/bin/env python
'''
This script writes X MegaByte random data to a number RADOS objects and reads the data again afterwards
It sends these statistics towards a InfluxDB so you can plot the latency of your Ceph cluster
Author: Wido den Hollander <wido@widodh.nl>
'''
import argparse
import uuid
import timeit
import datetime
import socket
import logging
import string
import random
from influxdb import InfluxDBClient
from rados import Rados, Error
logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)
def random_data(size):
return ''.join(random.choice(string.lowercase) for x in xrange(size))
def wrapper(func, *args, **kwargs):
def wrapped():
return func(*args, **kwargs)
return wrapped
def rados_write(io, obj, data):
io.write_full(obj, data)
def rados_read(io, obj, len):
io.read(obj, len)
def rados_bench(ceph_conf, pools, rados_id, object_size, num_objects):
try:
r = Rados(conffile=ceph_conf, rados_id=rados_id)
r.conf_set("client_mount_timeout", "10")
r.conf_set("rados_mon_op_timeout", "10")
r.conf_set("rados_osd_op_timeout", "10")
r.connect()
fsid = r.get_fsid()
except:
raise
stats = dict()
len = object_size * 1024
data = random_data(len)
for pool in pools:
try:
ioctx = r.open_ioctx(pool)
write_total_time = 0
read_total_time = 0
i = 0
while i < num_objects:
obj = str(uuid.uuid4())
wrapped_write = wrapper(rados_write, ioctx, obj, data)
write_total_time += timeit.timeit(wrapped_write, number=1)
wrapped_read = wrapper(rados_read, ioctx, obj, len)
read_total_time += timeit.timeit(wrapped_read, number=1)
ioctx.remove_object(obj)
i += 1
ioctx.close()
stats[pool] = {'write_latency': (write_total_time / num_objects) * 1000,
'read_latency': (read_total_time / num_objects) * 1000}
except Error as exc:
log.error(exc)
continue
r.shutdown()
return {'fsid': fsid, 'pools': stats}
def send_to_influx(host, port, user, password, db, ssl, verify, stats):
influx = InfluxDBClient(host=host, port=port, username=user,
password=password, database=db, ssl=ssl,
verify_ssl=verify)
influx.write_points(data)
def gen_influx_data(stats, now):
data = list()
for pool, values in stats['pools'].items():
entry = dict()
entry['measurement'] = 'ceph_pool_latency'
entry['tags'] = {'pool': pool, 'ceph_fsid': stats['fsid']}
entry['time'] = now
entry['fields'] = {'write_latency': values['write_latency'],
'read_latency': values['read_latency']}
data.append(entry)
return data
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Measure Ceph RADOS latency and send to Influx')
parser.add_argument("--host", action="store", dest="influx_host", default="localhost", help="The Influx host")
parser.add_argument("--port", action="store", dest="influx_port", type=int, default=8086, help="The Influx port")
parser.add_argument("--user", action="store", dest="influx_user", help="The Influx port")
parser.add_argument("--password", action="store", dest="influx_password", help="The Influx port")
parser.add_argument("--db", action="store", dest="influx_db", default="ceph", help="The InfluxDB database")
parser.add_argument("--ssl", action="store_true", dest="influx_ssl", help="Enable SSL for Influx")
parser.add_argument("--verify", action="store_true", dest="influx_verify", help="Verify SSL")
parser.add_argument("-n", "--dry", action="store_true", dest="dry", help="Dry run, do not send to Influx")
parser.add_argument("--size", action="store", dest="object_size", type=int, default=1, help="The object size in KiloBytes")
parser.add_argument("--num", action="store", dest="num_objects", type=int, default=10, help="The number of objects to write and read")
parser.add_argument("-p", "--pool", action="append", dest="rados_pool", required=True, help="The RADOS pool to write in. Specify multiple times for more pools")
parser.add_argument("-c", "--conf", action="store", dest="ceph_conf", default="/etc/ceph/ceph.conf", help="The ceph configuration file")
parser.add_argument("-i", "--id", action="store", dest="rados_id", default="admin", help="Cephx ID to use")
parser.add_argument("-d", "--debug", action="store_true", dest="debug", help="Debug logging")
conf = parser.parse_args()
try:
stats = rados_bench(ceph_conf=conf.ceph_conf, pools=conf.rados_pool,
rados_id=conf.rados_id,
object_size=conf.object_size,
num_objects=conf.num_objects)
data = gen_influx_data(stats, datetime.datetime.utcnow().isoformat())
if not conf.dry:
log.debug(data)
send_to_influx(conf.influx_host, conf.influx_port, conf.influx_user,
conf.influx_password, conf.influx_db, conf.influx_ssl,
conf.influx_verify, data)
else:
log.info(data)
except Exception as e:
raise
log.error(e)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment