Skip to content

Instantly share code, notes, and snippets.

@jo-makar
Last active March 27, 2018 11:55
Show Gist options
  • Save jo-makar/d20674561f29e5276361fa718e085aaa to your computer and use it in GitHub Desktop.
Save jo-makar/d20674561f29e5276361fa718e085aaa to your computer and use it in GitHub Desktop.
Thread-safe metrics storage that calculates rate of change using linear regression
import collections, datetime, threading, time
class MetricsThread(threading.Thread):
'''
Thread-safe metrics storage that calculates rate of change using linear regression (ie line-fitting).
Values can be stored by (external) increment calls and/or by periodic callback functions.
Keyword arguments:
callbacks -- metrics to be also determined periodically via callbacks,
each callback is expected to return a (name, value) tuple
period -- period as a timedelta object
histlen -- history length
'''
def __init__(self, callbacks=[], period=datetime.timedelta(minutes=10), histlen=10):
threading.Thread.__init__(self)
self.daemon = True
self.callbacks = callbacks
self.period = period
# Current values
self.current = {}
self.curlock = threading.Lock()
# Historical values
self.history = collections.deque(maxlen=histlen)
self.histlock = threading.Lock()
# Rates of change
self.rates = {}
self.ratelock = threading.Lock()
def increment(self, name):
'''Increment value associated with name'''
with self.curlock:
if name not in self.current:
self.current[name] = 0
self.current[name] += 1
def rate(self, name):
'''
Return rate of change (as change/second) of value associated with name.
This rate is calculated using linear regression against the historical values.
'''
with self.ratelock:
rv = self.rates.get(name)
return rv
def run(self):
def linreg(x, y):
'''
Linear regression, returns a, b of y=a+bx that best fits the samples.
linreg(x=[43,21,25,42,57,59], y=[99,65,79,75,87,81]) => a=65.1416, b=0.3852
'''
assert len(x) == len(y)
n = len(x)
assert n > 2
x2 = map(lambda i:i*i, x)
xy = map(lambda i,j:i*j, x,y)
a = (sum(y)*sum(x2) - sum(x)*sum(xy)) / float(n*sum(x2) - sum(x)*sum(x))
b = (n*sum(xy) - sum(x)*sum(y)) / float(n*sum(x2) - sum(x)*sum(x))
return a, b
while True:
time.sleep(self.period.total_seconds())
for cb in self.callbacks:
try:
name, val = cb()
with self.curlock:
self.current[name] = val
except:
pass
with self.histlock:
with self.curlock:
self.history.append([datetime.datetime.now(),
copy.copy(self.current)])
logger.info('metrics history recorded: %r', self.current)
with self.ratelock:
# Calculate the rate of change for the values just recorded
self.rates = {}
for k in self.history[-1][1].keys():
x = []
y = []
for i in range(len(self.history)):
if k in self.history[i][1]:
# Convert to seconds since the epoch (ie Unix time)
x += [time.mktime(self.history[i][0].timetuple())]
y += [self.history[i][1][k]]
# Shift the time values (to avoid rounding errors)
x0 = x[0]
x = map(lambda i:i-x0, x)
if len(x) > 2:
a, b = linreg(x, y)
self.rates[k] = b
#logger.info('metrics rates calculated: %r', self.rates)
s = '{' + (', '.join(['%s: %.3f' % (k,v) for k,v in self.rates.iteritems()])) + '}'
logger.info('metrics rates calculated: %s', s)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment