Skip to content

Instantly share code, notes, and snippets.

@opensourcegeek
Created November 14, 2016 19:41
Show Gist options
  • Save opensourcegeek/47d96302407985f69d82f5df9a6fb75a to your computer and use it in GitHub Desktop.
Save opensourcegeek/47d96302407985f69d82f5df9a6fb75a to your computer and use it in GitHub Desktop.
Python code to look for bacons and sausages
import os
import thread
from Queue import Queue
from collections import deque
import time
from bottle import route, run, template
from daemonizer import Daemonizer
MAX_Q_LENGTH = 60 # rolling avg for a minute
__HOME__ = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
class CircularQ:
def __init__(self, qsize):
self.items = deque()
self.max_len = qsize
def put_item(self, item):
if len(self.items) >= self.max_len:
# time to remove an item at end
self.items.pop()
self.items.appendleft(item)
def get_items(self):
return self.items
def group_by_bacon(all_files):
grouped = {}
for l in all_files:
if l.startswith('bacon_utilization'):
# print(l)
bacon = l.split('_')[2]
if bacon not in grouped:
grouped[bacon] = []
grouped[bacon].append(l)
return grouped
def producer(name, items_q):
while True:
directory_name = os.path.join(__HOME__, 'logs')
all_files = os.listdir(directory_name)
# Separate log files per sausage and total so group them by bacon(serial) and loop through them to get data
# print(all_files)
grouped_by_bacon = group_by_bacon(all_files)
for bacon, all_sausage_files in grouped_by_bacon.items():
all_files_data = {
'bacon': bacon
}
for l in all_sausage_files:
try:
f = open(directory_name + '/' + l, 'r')
all_content = f.read()
print(all_content)
if ',' in all_content:
# NB: local_date_time needs to be handled!
# so we avoid old data??
local_date_time, val = all_content.split(',')
metric_index = l.split("_")[3].split('.')[0]
key = 'val-' + str(metric_index)
all_files_data.update({key: int(val)})
except Exception, e:
print("Cannot read from file")
items_q.put_nowait(all_files_data)
time.sleep(1)
def get_all_value_keys(values_dict):
all_vals_keys = values_dict.keys()
return [k for k in all_vals_keys if k.startswith('val')]
def consumer(name, items_q, processed_items):
while True:
try:
i = items_q.get_nowait()
if i is not None:
bacon = i['bacon']
all_val_keys = get_all_value_keys(i)
for v_key in all_val_keys:
if bacon not in processed_items:
processed_items[bacon] = {}
else:
# bacon is present alrady
if v_key not in processed_items[bacon]:
processed_items[bacon][v_key] = CircularQ(MAX_Q_LENGTH)
processed_items[bacon][v_key].put_item(i[v_key])
except Exception, e:
print(e)
time.sleep(0.5)
def avg(q):
if len(q) <= 0:
return 0
all_data = list(q)
return reduce(lambda x, y: x + y, all_data, 0) / len(all_data)
def percentage_usage_100(q):
# 100 percent usage in last X seconds. Count how many times
# we see 100 percent usage and then compute percentage by doing
# (count / window_size) * 100
if len(q) <= 0:
return 0
all_data = list(q)
count = 0
for i in all_data:
if i == 100:
count += 1
return (count / len(all_data)) * 100.0
class App:
def __init__(self):
self.processed_items = {}
self.new_items_q = Queue()
self.start_producer()
self.start_consumer()
def sample(self):
return "Boo"
def get_utilization_json(self):
all_bacons = []
for bacon, all_values in self.processed_items.items():
print(bacon)
print(all_values)
obj = {
'bacon': bacon
}
for k, v in all_values.items():
metric_index = k.split('-')[1]
avg_utilization_key = 'average-utilization-' + metric_index
hundred_utilization_key = 'hundred-percent-utilization-' + metric_index
obj.update({
avg_utilization_key: avg(v.get_items()),
hundred_utilization_key: percentage_usage_100(v.get_items())
})
all_bacons.append(obj)
return {
'all_bacons': all_bacons
}
def get_utilization_csv(self):
all_bacons = []
for bacon, all_values in self.processed_items.items():
print(bacon)
print(all_values)
obj = [bacon]
for k, v in all_values.items():
obj.append(str(avg(v.get_items())))
obj.append(str(percentage_usage_100(v.get_items())))
all_bacons.append(obj)
return '\n'.join(','.join(row) for row in all_bacons)
def start_producer(self):
thread.start_new_thread(producer,
("producer-thread", self.new_items_q))
def start_consumer(self):
thread.start_new_thread(consumer,
("consumer-thread",
self.new_items_q,
self.processed_items))
class ServerApp:
def __init__(self):
pass
def runAsLongRunner(self):
a = App()
route("/hello")(a.sample)
route("/utilization.json")(a.get_utilization_json)
route("/utilization.csv")(a.get_utilization_csv)
run(host='0.0.0.0', port=16080)
if __name__ == '__main__':
daemon = Daemonizer(ServerApp(), 'sausages-ws', write_log=True)
daemon.setLog("dummy")
daemon.daemonize()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment