Skip to content

Instantly share code, notes, and snippets.

@mbaldessari
Created September 27, 2014 09:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mbaldessari/30dc7ae2fe46d9b804f2 to your computer and use it in GitHub Desktop.
Save mbaldessari/30dc7ae2fe46d9b804f2 to your computer and use it in GitHub Desktop.
#!/usr/bin/python
from __future__ import print_function
import cProfile
from datetime import datetime
import os
import pstats
import resource
import StringIO
import sys
import time
import cpmapi as c_api
from pcp import pmapi
USE_PROFILER = True
TOP_PROFILED_FUNCTIONS = 15
class _Options(object):
def __init__(self):
self.input_file = ""
self.start_time = None
self.end_time = None
self.include = []
self.exclude = []
self.custom_graphs = []
self.raw = False
self.output_file = "output.pdf"
self.interval = None
self.opts = self.setup()
def setup(self):
""" Setup default command line argument option handling """
opts = pmapi.pmOptions()
opts.pmSetOptionCallback(self.option_callback)
opts.pmSetOverrideCallback(self.override)
opts.pmSetShortOptions("?S:T:t:a:")
opts.pmSetOptionFlags(c_api.PM_OPTFLAG_BOUNDARIES)
opts.pmSetOptionFlags(c_api.PM_OPTFLAG_MULTI)
opts.pmSetLongOptionStart()
opts.pmSetLongOptionFinish()
opts.pmSetLongOptionInterval()
opts.pmSetLongOptionArchive()
opts.pmSetLongOptionVersion()
opts.pmSetLongOptionHelp()
return opts
def override(self, opt):
""" Override a few standard PCP options to match free(1) """
return 0
def option_callback(self, opt, optarg, index):
""" Perform setup for an individual command line option """
# pylint: disable=W0613
if opt == "S":
self.start_time = optarg
elif opt == "T":
self.end_time = optarg
elif opt == "t":
self.interval = optarg
elif opt == "a":
self.input_file = optarg
class Test(object):
def __init__(self):
self.pmns = {}
self.counter = 0
self.profile = cProfile.Profile()
self.profile.enable()
self.start_time = time.time()
usage = resource.getrusage(resource.RUSAGE_SELF)
print("Before parsing: usertime={0} systime={1} mem={2} MB"
.format(usage[0], usage[1], (usage[2] / 1024.0)))
if USE_PROFILER:
# Set up profiling for pdf generation
self.profile.enable()
def profile_report(self):
if USE_PROFILER:
self.profile.disable()
str_io = StringIO.StringIO()
sortby = 'cumulative'
pstat = pstats.Stats(self.profile, stream=str_io).sort_stats(sortby)
pstat.print_stats(TOP_PROFILED_FUNCTIONS)
print("\nProfiling of parse()")
print(str_io.getvalue())
def pmns_callback(self, label):
pmid = self.context.pmLookupName(label)
desc = self.context.pmLookupDesc(pmid[0])
self.pmns[label] = (desc.type, desc.sem, desc.contents.units)
self.counter += 1
def _timestamp_to_secs(self, tstamp):
'''Convert a timestamp object (tv_sec + tv_usec) to seconds'''
secs = tstamp.tv_sec + (tstamp.tv_usec * 10**-6)
return secs
def get_metrics(self):
'''Returns a list of metric labels of all the metrics contained in
the archive'''
return self.pmns.keys()
def get_metric_info(self, metric):
'''Given a metric label, return (type, sem, units)'''
return self.pmns[metric]
def get_pmids(self, metrics):
'''Given a list of metrics, returns a list of PMIDs'''
return self.context.pmLookupName(metrics)
def _extract_value(self, result, desc, i, inst=0):
'''Return python value given a pmExtractValue set of parameters'''
mtype = desc.contents.type
value = self.context.pmExtractValue(
result.contents.get_valfmt(i),
result.contents.get_vlist(i, inst),
mtype, mtype)
if mtype == c_api.PM_TYPE_U64:
retval = value.ull
elif mtype == c_api.PM_TYPE_U32:
retval = value.ul
elif mtype == c_api.PM_TYPE_64:
retval = value.ll
elif mtype == c_api.PM_TYPE_32:
retval = value.l
elif mtype == c_api.PM_TYPE_STRING:
retval = value.cp
elif mtype == c_api.PM_TYPE_FLOAT:
retval = value.f
elif mtype == c_api.PM_TYPE_DOUBLE:
retval = value.d
else:
raise Exception("Metric has unknown type: [%s]" % (mtype))
return retval
def parse(self):
'''Returns a dictionary of dictionary containing all the data within
a PCP archive log file. Data will be returned as a a tuple
(data, skipped_metrics). skipped_metrics is a list of metrics skipped
because the archive log was corrupted. data will be in the following
form:
return[metric1] = {'indom1': [(ts0, ts1, .., tsN), (v0, v1, .., vN)],
....
'indomN': [(ts0, ts1, .., tsN), (v0, v1, .., vN)]}
return[metric2] = {'indom1': [(ts0, ts1, .., tsX), (v0, v1, .., vX)],
....
'indomN': [(ts0, ts1, .., tsX), (v0, v1, .., vX)]}
(ts0, .., tsN) are timestamps in datetime format and (v0, .., vN) are
the actual values. If a metric has no indom 0 will be used as its key'''
self.context = pmapi.pmContext.fromOptions(opts.opts, sys.argv)
self.context.pmTraversePMNS('', self.pmns_callback)
self.start = opts.opts.pmGetOptionStart()
self.end = opts.opts.pmGetOptionFinish()
self.interval = opts.opts.pmGetOptionInterval()
print('Userdefined Start: {0} - End: {1} - Interval: {2}'.format(
self.start, self.end, self.interval))
self.context.pmSetMode(c_api.PM_MODE_FORW, self.start, 0)
metrics = self.get_metrics()
pmids = self.get_pmids(metrics)
data = {}
skipped_metrics = []
# This is just used as an optimization. The keys are (numpmid, numinst) and the value is
# the indom name. This avoids too many expensive calls to pmNameInDomArchive
indom_map = {}
while 1:
try:
# Trying to do this without pmFetchArchive() which does not support INTERP mode
# result = self.context.pmFetchArchive()
pmids = self.context.pmLookupName(metrics)
result = self.context.pmFetch(pmids)
except pmapi.pmErr, error:
# Exit if we are at the end of the file or if the record is corrupted
# Signal any other issues
if error.args[0] in [c_api.PM_ERR_EOL, c_api.PM_ERR_LOGREC]:
break
else:
raise error
secs = self._timestamp_to_secs(result.contents.timestamp)
if not (float(self.start) <= secs and secs <= float(self.end)):
self.context.pmFreeResult(result)
continue
ts = datetime.fromtimestamp(secs)
for i in range(result.contents.numpmid):
pmid = result.contents.get_pmid(i)
desc = self.context.pmLookupDesc(pmid)
metric = self.context.pmNameID(pmid)
if metric not in data:
data[metric] = {}
count = result.contents.get_numval(i)
if count == 0: # FIXME: double-check this (no instance whatsoever)
continue
elif count == 1: # No indoms are present
try:
value = self._extract_value(result, desc, i)
except pmapi.pmErr, error:
if error.args[0] in [c_api.PM_ERR_CONV]:
skipped_metrics.append(metric)
continue
raise error
if 0 not in data[metric]:
data[metric][0] = [[ts,], [value,]]
else:
data[metric][0][0].append(ts)
data[metric][0][1].append(value)
continue
for j in range(count):
inst = result.contents.get_inst(i, j)
try:
value = self._extract_value(result, desc, i, j)
except pmapi.pmErr, error:
if error.args[0] in [c_api.PM_ERR_CONV]:
skipped_metrics.append(metric)
continue
if (i, j) not in indom_map:
indom = self.context.pmNameInDomArchive(desc, inst)
indom_map[(i, j)] = indom
else:
indom = indom_map[(i, j)]
if indom not in data[metric]:
data[metric][indom] = [[ts,], [value,]]
else:
data[metric][indom][0].append(ts)
data[metric][indom][1].append(value)
self.context.pmFreeResult(result)
print("data: %s - skipped: %s" % (len(data), len(skipped_metrics)))
usage = resource.getrusage(resource.RUSAGE_SELF)
print("Before parsing: usertime={0} systime={1} mem={2} MB"
.format(usage[0], usage[1], (usage[2] / 1024.0)))
if __name__ == '__main__':
global opts
opts = _Options()
if c_api.pmGetOptionsFromList(sys.argv) != 0:
c_api.pmUsageMessage()
sys.exit(1)
pcp_files = opts.opts.pmGetOptionArchives()
if pcp_files == None:
print("Error: No pcp archives specified")
c_api.pmUsageMessage()
sys.exit(1)
if len(pcp_files) == 1 and not os.path.exists(pcp_files[0]):
print("Path does not exist: {0}".format(pcp_files[0]))
sys.exit(1)
# Size in MB
size = os.stat(pcp_files[0]).st_size / (1024.0 ** 2)
print("Parsing files: {0} - {1} MB".format(" ".join(map(os.path.basename, pcp_files)), size), end='')
print()
test = Test()
test.parse()
test.profile_report()
# vim: autoindent tabstop=4 expandtab smarttab shiftwidth=4 softtabstop=4 tw=0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment