Skip to content

Instantly share code, notes, and snippets.

@astanway
Created April 2, 2013 18:31
Show Gist options
  • Save astanway/5294861 to your computer and use it in GitHub Desktop.
Save astanway/5294861 to your computer and use it in GitHub Desktop.
from pandas import stats, Series
import numpy as np
from numpy import linalg as numpy_linalg
from numpy import mean as numpy_mean
from numpy import sqrt as numpy_sqrt
from numpy import round as numpy_round
from scipy import stats as sci_stats
from scipy import array as scipy_array
from scipy import std as scipy_std
from time import time
import settings
"""
This is no man's land. Do anything you want in here,
as long as you return a boolean that determines whether the input
timeseries is aberrant or not.
The only thing that calls these functions is the
analyze() function in analysis_manager.py. Be sure to
change the algorithm appropriately within that function.
All algorithm explanations should begin with,
"A timeseries is aberrant if..."
"""
WINDOW = -1
def grubbs(timeseries):
series = scipy_array([x[1] for x in timeseries])
stdDev = scipy_std(series)
mean = numpy_mean(series)
tail_average = (series[-1] + series[-2] + series[-3]) / 3
z_score = (tail_average - mean) / stdDev
len_series = len(series)
threshold = sci_stats.t.isf(.05 / (2 * len_series) , len_series - 2)
threshold_squared = threshold * threshold
grubbs_score = ((len_series - 1) / numpy_sqrt(len_series)) * numpy_sqrt(threshold_squared / (len_series - 2 + threshold_squared))
if z_score > grubbs_score:
return numpy_round(z_score - grubbs_score, 2)
return False
def stddev_from_moving_average(timeseries):
"""
A timeseries is aberrant if the absolute value of the latest
datapoint minus the moving average is greater than one standard
deviation of the moving average
"""
series = Series([x[1] for x in timeseries])
expAverage = stats.moments.ewma(series, com=15)
stdDev = stats.moments.ewmstd(series, com=15)
if abs(series.iget(WINDOW) - expAverage.iget(WINDOW)) > 3 * stdDev.iget(WINDOW):
diff = round(abs(series.iget(WINDOW) - expAverage.iget(WINDOW)) - round(3 * stdDev.iget(WINDOW)))
return diff
return False
def linear_regression(timeseries):
x = np.array([t[0] for t in timeseries])
y = np.array([t[1] for t in timeseries])
A = np.vstack([x, np.ones(len(x))]).T
m, c = numpy_linalg.lstsq(A, y)[0]
residuals = []
for i, value in enumerate(y):
projected = m * x[i] + c
diff = value - projected
residuals.append(diff)
std_dev = scipy_std(residuals)
tail_avg = (residuals[-1] + residuals[-2] + residuals[-3]) / 3
if abs(tail_avg) > std_dev * 3 and round(std_dev) != 0 and round(tail_avg) != 0:
return tail_avg - std_dev
return False
def mean_subtraction_cumulation(timeseries):
"""
A timeseries is aberrant if the value of the next datapoint in the
series is farther than a standard deviation out in culmulative terms
after subtracting the mean from each data point.
"""
series = Series([x[1] if x[1] else 0 for x in timeseries])
series = series - series[0:len(series) - WINDOW].mean()
if abs(series.iget(WINDOW)) > 3 * series[0:len(series) - WINDOW].std():
return round(series.iget(WINDOW))
return False
SELECTED_ALGORITHM = grubbs
STALE_PERIOD = settings.STALE_PERIOD
FULL_DURATION = settings.FULL_DURATION
MIN_TOLERABLE_LENGTH = settings.MIN_TOLERABLE_LENGTH
MAX_TOLERABLE_SILENCE = settings.MAX_TOLERABLE_SILENCE
algorithms = [
grubbs,
mean_subtraction_cumulation,
stddev_from_moving_average,
linear_regression
]
def run_selected_algorithm(timeseries):
"""
Filter timeseries and run selected algorithm.
"""
# Get rid of short series
if len(timeseries) < MIN_TOLERABLE_LENGTH:
return False
# Get rid of stale series
if time() - timeseries[-1][0] > STALE_PERIOD:
return False
# Get rid of incomplete series
duration = timeseries[-1][0] - timeseries[0][0]
if duration < FULL_DURATION:
return False
# Get rid of empty series
total = sum([tuple[1] for i, tuple in enumerate(timeseries) if i < MAX_TOLERABLE_SILENCE])
if total == 0:
return False
ensemble = [algorithm(timeseries) for algorithm in algorithms]
try:
return sum(ensemble) / len(ensemble)
except:
return False
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment