Skip to content

Instantly share code, notes, and snippets.

@cdahlqvist
Last active May 17, 2017 10:50
Show Gist options
  • Save cdahlqvist/76692b2274c062e9bb5864618e516272 to your computer and use it in GitHub Desktop.
Save cdahlqvist/76692b2274c062e9bb5864618e516272 to your computer and use it in GitHub Desktop.
Script to transform Metricbeat system metrics into multiple formats for benchmarking
#!/usr/bin/env python
import json
import sys
import md5
import collections
import datetime
import re
epoch = datetime.datetime.utcfromtimestamp(0)
def flatten(d, parent_key='', sep='.'):
items = []
for k, v in d.items():
new_key = parent_key + sep + k if parent_key else k
if isinstance(v, collections.MutableMapping):
items.extend(flatten(v, new_key, sep=sep).items())
else:
items.append((new_key, v))
return dict(items)
def timestamp_to_ms(ts):
ts_utc = ts.replace('Z','UTC')
dt = datetime.datetime.strptime(ts_utc, '%Y-%m-%dT%H:%M:%S.%f%Z')
return int((dt - epoch).total_seconds() * 1000.0)
def is_not_number(s):
try:
float(s)
return False
except ValueError:
return True
regexp = re.compile(r'[ ,=]')
# Open output files
mbf = open('metricbeat_full.json', 'w')
mbm = open('metricbeat_metrics.json', 'w')
idf = open('influxdb_full.txt', 'w')
idm = open('influxdb_metrics.txt', 'w')
stats = {'total': {'docs': 0, 'metrics': 0}}
for line in sys.stdin:
rec = json.loads(line)
# Remove select tags
del rec['tags']
del rec['metricset']['rtt']
# Obfuscate select fields
if 'meta' in rec.keys():
rec['meta']['cloud']['instance_id'] = md5.new(rec['meta']['cloud']['instance_id']).hexdigest()
meta = rec['meta']
del rec['meta']
else:
meta = {}
rec['beat']['name'] = md5.new(rec['beat']['name']).hexdigest()
rec['beat']['hostname'] = md5.new(rec['beat']['hostname']).hexdigest()
if 'process' in rec['system'].keys():
rec['system']['process']['name'] = md5.new(rec['system']['process']['name']).hexdigest()
if 'cmdline' in rec['system']['process'].keys():
rec['system']['process']['cmdline'] = md5.new(rec['system']['process']['cmdline']).hexdigest()
if 'network' in rec['system'].keys():
if 'name' in rec['system']['network'].keys():
rec['system']['network']['name'] = md5.new(rec['system']['network']['name']).hexdigest()
if 'filesystem' in rec['system'].keys():
if 'device_name' in rec['system']['filesystem'].keys():
rec['system']['filesystem']['device_name'] = md5.new(rec['system']['filesystem']['device_name']).hexdigest()
if 'mount_point' in rec['system']['filesystem'].keys():
rec['system']['filesystem']['mount_point'] = md5.new(rec['system']['filesystem']['mount_point']).hexdigest()
# Process record
ts = rec['@timestamp']
system = rec['system']
# Write anonymized record before further processing
mbf.write("{}\n".format(json.dumps(rec)))
# Remove parts not to be flattened
del rec['@timestamp']
del rec['system']
# Extract file system mount point and device name from system
if 'filesystem' in system.keys():
if 'mount_point' in system['filesystem'].keys():
if 'system' not in rec.keys():
rec['system'] = {}
rec['system']['filesystem'] = { 'mount_point': system['filesystem']['mount_point'], 'device_name': system['filesystem']['device_name'] }
del system['filesystem']['mount_point']
del system['filesystem']['device_name']
# Extract process data
if 'process' in system.keys():
if 'system' not in rec.keys():
rec['system'] = {}
rec['system']['process'] = { 'state': system['process']['state'], 'username': system['process']['username'], 'name': system['process']['name'], 'cpu': { 'start_time': system['process']['cpu']['start_time']}}
del system['process']['name']
del system['process']['username']
del system['process']['cpu']['start_time']
del system['process']['state']
if 'cmdline' in system['process'].keys():
rec['system']['process']['cmdline'] = system['process']['cmdline']
del system['process']['cmdline']
if 'pid' in system['process'].keys():
rec['system']['process']['pid'] = system['process']['pid']
del system['process']['pid']
if 'ppid' in system['process'].keys():
rec['system']['process']['ppid'] = system['process']['ppid']
del system['process']['ppid']
if 'pgid' in system['process'].keys():
rec['system']['process']['pgid'] = system['process']['pgid']
del system['process']['pgid']
# Extract diskio name
if 'diskio' in system.keys():
if 'system' not in rec.keys():
rec['system'] = {}
rec['system']['diskio'] = { 'name': system['diskio']['name'] }
del system['diskio']['name']
if 'serial_number' in system['diskio'].keys():
rec['system']['diskio']['serial_number'] = system['diskio']['serial_number']
del system['diskio']['serial_number']
# Extract network name
if 'network' in system.keys():
if 'system' not in rec.keys():
rec['system'] = {}
rec['system']['network'] = { 'name': system['network']['name'] }
del system['network']['name']
# Flatten system and rec dicts
flat_rec = flatten(rec)
flat_system = flatten(system, 'system')
flat_points = flatten(system)
metricset = "{}.{}".format(rec['metricset']['module'], rec['metricset']['name'])
# Record stats
if rec['metricset']['name'] not in stats.keys():
stats[rec['metricset']['name']] = {'docs': 0, 'metrics': 0}
stats[rec['metricset']['name']]['docs'] += 1
stats[rec['metricset']['name']]['metrics'] += len(flat_system.keys())
stats['total']['docs'] += 1
stats['total']['metrics'] += len(flat_system.keys())
# Create InfluxDB key-value list
kv_list = []
for key in flat_rec.keys():
if isinstance(flat_rec[key], basestring):
val = flat_rec[key].replace(',', ';').replace('=', ':')
kv_list.append("{}={}".format(key, val))
else:
val = flat_rec[key]
kv_list.append("{}={}".format(key, val))
idm_kvstring = ",".join(kv_list)
idm_ts = timestamp_to_ms(ts)
# Create poimnts list
points_list = []
for key in flat_points.keys():
points_list.append("{}={}".format(key, flat_points[key]))
if is_not_number(flat_points[key]):
print("VALUE ERROR ==> {} => {}".format(key, flat_points[key]))
idm_pointsstring = ",".join(points_list)
# Create and write full InfluxDB records
idfrec = "{},{} {} {}".format(metricset, idm_kvstring, idm_pointsstring, idm_ts)
# Write anonymized record before further processing
idf.write("{}\n".format(idfrec))
# Write InfluxDB metrics records
for key in flat_system.keys():
idm.write("{},{} value={} {}\n".format(key, idm_kvstring, flat_system[key], idm_ts))
# Write metricbeat metrics records
rec['@timestamp'] = ts
for key in flat_system.keys():
if isinstance(flat_system[key], float):
if 'long_value' in rec.keys():
del rec['long_value']
rec['metric'] = key
rec['float_value'] = flat_system[key]
else:
if 'float_value' in rec.keys():
del rec['float_value']
rec['metric'] = key
rec['long_value'] = flat_system[key]
mbm.write("{}\n".format(json.dumps(rec)))
print(json.dumps(stats))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment