Skip to content

Instantly share code, notes, and snippets.

Created April 2, 2013 18:31
Show Gist options
  • Save astanway/5294861 to your computer and use it in GitHub Desktop.
Save astanway/5294861 to your computer and use it in GitHub Desktop.
from pandas import stats, Series
import numpy as np
from numpy import linalg as numpy_linalg
from numpy import mean as numpy_mean
from numpy import sqrt as numpy_sqrt
from numpy import round as numpy_round
from scipy import stats as sci_stats
from scipy import array as scipy_array
from scipy import std as scipy_std
from time import time
import settings
This is no man's land. Do anything you want in here,
as long as you return a boolean that determines whether the input
timeseries is aberrant or not.
The only thing that calls these functions is the
analyze() function in Be sure to
change the algorithm appropriately within that function.
All algorithm explanations should begin with,
"A timeseries is aberrant if..."
def grubbs(timeseries):
series = scipy_array([x[1] for x in timeseries])
stdDev = scipy_std(series)
mean = numpy_mean(series)
tail_average = (series[-1] + series[-2] + series[-3]) / 3
z_score = (tail_average - mean) / stdDev
len_series = len(series)
threshold = sci_stats.t.isf(.05 / (2 * len_series) , len_series - 2)
threshold_squared = threshold * threshold
grubbs_score = ((len_series - 1) / numpy_sqrt(len_series)) * numpy_sqrt(threshold_squared / (len_series - 2 + threshold_squared))
if z_score > grubbs_score:
return numpy_round(z_score - grubbs_score, 2)
return False
def stddev_from_moving_average(timeseries):
A timeseries is aberrant if the absolute value of the latest
datapoint minus the moving average is greater than one standard
deviation of the moving average
series = Series([x[1] for x in timeseries])
expAverage = stats.moments.ewma(series, com=15)
stdDev = stats.moments.ewmstd(series, com=15)
if abs(series.iget(WINDOW) - expAverage.iget(WINDOW)) > 3 * stdDev.iget(WINDOW):
diff = round(abs(series.iget(WINDOW) - expAverage.iget(WINDOW)) - round(3 * stdDev.iget(WINDOW)))
return diff
return False
def linear_regression(timeseries):
x = np.array([t[0] for t in timeseries])
y = np.array([t[1] for t in timeseries])
A = np.vstack([x, np.ones(len(x))]).T
m, c = numpy_linalg.lstsq(A, y)[0]
residuals = []
for i, value in enumerate(y):
projected = m * x[i] + c
diff = value - projected
std_dev = scipy_std(residuals)
tail_avg = (residuals[-1] + residuals[-2] + residuals[-3]) / 3
if abs(tail_avg) > std_dev * 3 and round(std_dev) != 0 and round(tail_avg) != 0:
return tail_avg - std_dev
return False
def mean_subtraction_cumulation(timeseries):
A timeseries is aberrant if the value of the next datapoint in the
series is farther than a standard deviation out in culmulative terms
after subtracting the mean from each data point.
series = Series([x[1] if x[1] else 0 for x in timeseries])
series = series - series[0:len(series) - WINDOW].mean()
if abs(series.iget(WINDOW)) > 3 * series[0:len(series) - WINDOW].std():
return round(series.iget(WINDOW))
return False
algorithms = [
def run_selected_algorithm(timeseries):
Filter timeseries and run selected algorithm.
# Get rid of short series
if len(timeseries) < MIN_TOLERABLE_LENGTH:
return False
# Get rid of stale series
if time() - timeseries[-1][0] > STALE_PERIOD:
return False
# Get rid of incomplete series
duration = timeseries[-1][0] - timeseries[0][0]
if duration < FULL_DURATION:
return False
# Get rid of empty series
total = sum([tuple[1] for i, tuple in enumerate(timeseries) if i < MAX_TOLERABLE_SILENCE])
if total == 0:
return False
ensemble = [algorithm(timeseries) for algorithm in algorithms]
return sum(ensemble) / len(ensemble)
return False
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment