Skip to content

Instantly share code, notes, and snippets.

Created October 24, 2015 19:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save MauricioRoman/a1a17791a0423e1f3a5a to your computer and use it in GitHub Desktop.
Save MauricioRoman/a1a17791a0423e1f3a5a to your computer and use it in GitHub Desktop.
def get_load_avg():
load_avg = list (os.getloadavg())
return {#"1m": load_avg[0],
"5m": load_avg[1],
def get_disk_usage():
disk_usage = {}
for part in psutil.disk_partitions(all=False):
if == 'nt':
if 'cdrom' in part.opts or part.fstype == '':
# skip cd-rom drives with no disk in it; they may raise
# ENOENT, pop-up a Windows GUI error for a non-ready
# partition or just hang.
usage = psutil.disk_usage(part.mountpoint)
return disk_usage
def get_disk_io():
disk_io_cum = {}
for k,v in psutil.disk_io_counters(perdisk=True).iteritems():
return disk_io_cum
def get_process_metrics():
return {"process":{ "load_avg":get_load_avg(),
def build_consolidator_log(actual_timestamp, percent_distinct, percent_compression,
step_in_sec, topic):
msg = {"timestamp":actual_timestamp,
np.round(percent_distinct / float(step_in_sec),2),
return msg
def build_process_log(timestamp, processed_timestamp, delay_min, processing_time_ms, topic, volume):
msg = {"timestamp":timestamp,
return msg
def send_to_skyline(series, metric_set):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Datapoints are {metric:[ [timestamp,value],[timestamp,value]...]}
for record in series:
metric = "%s.%s" % (metric_set, record['metric'])
datapoint = [record['timestamp'],record['value']]
packet = msgpack.packb((metric, datapoint))
sock.sendto(packet, (socket.gethostname(), SKYLINE_UDP_PORT))
def flatten(d, parent_key='', sep='.'):
items = []
for k, v in d.items():
new_key = parent_key + sep + k if parent_key else k
if isinstance(v, collections.MutableMapping):
items.extend(flatten(v, new_key).items())
items.append((new_key, [v]))
return dict(items)
def log_to_hash(msg):
df = (pd.DataFrame
.from_dict(flatten(msg), orient='columns'))
df.drop(['timestamp'], inplace=True, axis=1)
df = (df
.select_dtypes(include = ['object'])
s = ".".join( dict(df.values).values() )
s_hashed = abs(mmh3.hash(s))"Mapping %s to %d" % (s, s_hashed))
return s_hashed
def log_to_series(msg, timestamp):
Transforms a single log event into a series of datapoints
df = (pd.DataFrame
.from_dict(flatten(msg), orient='columns')
.select_dtypes(exclude = ['object','datetime64'])
df.columns = ['metric','value']
df['timestamp'] = timestamp
return df.to_dict(orient='records')
def date_to_unix(timestamp):
return int ((timestamp - datetime.datetime(1970,1,1)).total_seconds())
def log_via_anomaly_detector(msg_list, actual_time, tag):
We hash the categorical features and extract the metrics
using the hash to identify each metric
as well as to identify the log event which generated it
logs_to_send = []
for msg in msg_list:
series = log_to_series(msg,date_to_unix(actual_time))
metric_hash = log_to_hash(msg)
send_to_skyline(series, "id.%d"%metric_hash)
except Exception as e:
log.warn("Exception: %s" %format_exc(e))
status = send_log_bulk_data(Loggly_Token, logs_to_send, False, tag)
return status
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment