Skip to content

Instantly share code, notes, and snippets.

@jd
Created February 8, 2018 15:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jd/7466cebd1c7fb3a59f9c4e768572ce5d to your computer and use it in GitHub Desktop.
Save jd/7466cebd1c7fb3a59f9c4e768572ce5d to your computer and use it in GitHub Desktop.
Gnocchi profiling tool
# -*- encoding: utf-8 -*-
import cProfile
import random
import uuid
import daiquiri
import numpy
from gnocchi.cli import metricd
from gnocchi import incoming
from gnocchi import indexer
from gnocchi import storage
from gnocchi import service
from gnocchi import utils
LOG = daiquiri.getLogger("gnocchi.benchmark")
NOW = numpy.datetime64(utils.utcnow())
MEASURES_PER_METRIC = 60 # seconds of data
_NODES = 1
_VM_PER_NODES = 10
METRICS = _NODES * 100 + _VM_PER_NODES * _NODES * 30 # 1000 nodes * 100 metrics + 10 vm per node * 30 metrics per vm
ARCHIVE_POLICY_NAME = "bool"
CONF = service.prepare_service()
LOG.debug("Setting up incoming")
INCOMING = incoming.get_driver(CONF)
LOG.debug("Setting up coordinator")
COORD = metricd.get_coordinator_and_start(str(uuid.uuid4()),
CONF.coordination_url)
LOG.debug("Setting up storage")
STORAGE = storage.get_driver(CONF)
LOG.debug("Setting up indexer")
INDEXER = indexer.get_driver(CONF)
LOG.info("Creating %d metrics", METRICS)
sw = utils.StopWatch().start()
metrics = [
INDEXER.create_metric(uuid.uuid4(), "admin", ARCHIVE_POLICY_NAME).id
for _ in range(METRICS)
]
LOG.info("Created %d metrics in %.2fs", METRICS, sw.elapsed())
LOG.info("Generating %d measures per metric for %d metrics… ",
MEASURES_PER_METRIC, METRICS)
sw.reset()
measures = {
m_id: [incoming.Measure(
NOW + numpy.timedelta64(seconds=s),
random.randint(-999999, 999999)) for s in range(MEASURES_PER_METRIC)]
for m_id in metrics
}
LOG.info("… done in %.2fs", sw.elapsed())
sw.reset()
INCOMING.add_measures_batch(measures)
total_measures = sum(map(len, measures.values()))
LOG.info("Pushed %d measures in %.2fs",
total_measures,
sw.elapsed())
sw.reset()
PROF = None
# Uncomment to enable profiling
# PROF = cProfile.Profile()
if PROF:
PROF.enable()
for s in range(INCOMING.NUM_SACKS):
LOG.debug("Getting lock for sack %d", s)
lock = INCOMING.get_sack_lock(COORD, s)
if not lock.acquire(blocking=True):
raise RuntimeError("Unable to acquire lock for sack %s" % s)
try:
metrics = INCOMING.list_metric_with_measures_to_process(s)
STORAGE.process_new_measures(INDEXER, INCOMING, metrics)
INCOMING.finish_sack_processing(s)
LOG.debug("Processed sack %d", s)
finally:
lock.release()
LOG.debug("Released lock for sack %d", s)
if PROF:
PROF.disable()
PROF.dump_stats("%s.cprof" % __file__)
end = sw.elapsed()
LOG.info("Processed %d sacks in %.2fs", INCOMING.NUM_SACKS, end)
LOG.info("Speed: %.2f measures/s", float(total_measures) / end)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment