Skip to content

Instantly share code, notes, and snippets.

@jd jd/micro-metricd.py
Created Feb 8, 2018

Embed
What would you like to do?
Gnocchi profiling tool
# -*- encoding: utf-8 -*-
import cProfile
import random
import uuid
import daiquiri
import numpy
from gnocchi.cli import metricd
from gnocchi import incoming
from gnocchi import indexer
from gnocchi import storage
from gnocchi import service
from gnocchi import utils
LOG = daiquiri.getLogger("gnocchi.benchmark")
NOW = numpy.datetime64(utils.utcnow())
MEASURES_PER_METRIC = 60 # seconds of data
_NODES = 1
_VM_PER_NODES = 10
METRICS = _NODES * 100 + _VM_PER_NODES * _NODES * 30 # 1000 nodes * 100 metrics + 10 vm per node * 30 metrics per vm
ARCHIVE_POLICY_NAME = "bool"
CONF = service.prepare_service()
LOG.debug("Setting up incoming")
INCOMING = incoming.get_driver(CONF)
LOG.debug("Setting up coordinator")
COORD = metricd.get_coordinator_and_start(str(uuid.uuid4()),
CONF.coordination_url)
LOG.debug("Setting up storage")
STORAGE = storage.get_driver(CONF)
LOG.debug("Setting up indexer")
INDEXER = indexer.get_driver(CONF)
LOG.info("Creating %d metrics", METRICS)
sw = utils.StopWatch().start()
metrics = [
INDEXER.create_metric(uuid.uuid4(), "admin", ARCHIVE_POLICY_NAME).id
for _ in range(METRICS)
]
LOG.info("Created %d metrics in %.2fs", METRICS, sw.elapsed())
LOG.info("Generating %d measures per metric for %d metrics… ",
MEASURES_PER_METRIC, METRICS)
sw.reset()
measures = {
m_id: [incoming.Measure(
NOW + numpy.timedelta64(seconds=s),
random.randint(-999999, 999999)) for s in range(MEASURES_PER_METRIC)]
for m_id in metrics
}
LOG.info("… done in %.2fs", sw.elapsed())
sw.reset()
INCOMING.add_measures_batch(measures)
total_measures = sum(map(len, measures.values()))
LOG.info("Pushed %d measures in %.2fs",
total_measures,
sw.elapsed())
sw.reset()
PROF = None
# Uncomment to enable profiling
# PROF = cProfile.Profile()
if PROF:
PROF.enable()
for s in range(INCOMING.NUM_SACKS):
LOG.debug("Getting lock for sack %d", s)
lock = INCOMING.get_sack_lock(COORD, s)
if not lock.acquire(blocking=True):
raise RuntimeError("Unable to acquire lock for sack %s" % s)
try:
metrics = INCOMING.list_metric_with_measures_to_process(s)
STORAGE.process_new_measures(INDEXER, INCOMING, metrics)
INCOMING.finish_sack_processing(s)
LOG.debug("Processed sack %d", s)
finally:
lock.release()
LOG.debug("Released lock for sack %d", s)
if PROF:
PROF.disable()
PROF.dump_stats("%s.cprof" % __file__)
end = sw.elapsed()
LOG.info("Processed %d sacks in %.2fs", INCOMING.NUM_SACKS, end)
LOG.info("Speed: %.2f measures/s", float(total_measures) / end)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.