|
#!/usr/bin/python |
|
# -*- coding: utf-8 -*- |
|
import numpy as np |
|
import pylab |
|
|
|
|
|
def init(x, lag, threshold, influence): |
|
''' |
|
Smoothed z-score algorithm |
|
Implementation of algorithm from https://stackoverflow.com/a/22640362/6029703 |
|
''' |
|
|
|
labels = np.zeros(lag) |
|
filtered_y = np.array(x[0:lag]) |
|
avg_filter = np.zeros(lag) |
|
std_filter = np.zeros(lag) |
|
var_filter = np.zeros(lag) |
|
|
|
avg_filter[lag - 1] = np.mean(x[0:lag]) |
|
std_filter[lag - 1] = np.std(x[0:lag]) |
|
var_filter[lag - 1] = np.var(x[0:lag]) |
|
|
|
return dict(avg=avg_filter[lag - 1], |
|
var=var_filter[lag - 1], |
|
std=std_filter[lag - 1], |
|
filtered_y=filtered_y, |
|
labels=labels) |
|
|
|
def add(esult, single_value, lag, threshold, influence): |
|
previous_avg = result['avg'] |
|
previous_var = result['var'] |
|
previous_std = result['std'] |
|
filtered_y = result['filtered_y'] |
|
labels = result['labels'] |
|
|
|
if abs(single_value - previous_avg) > threshold * previous_std: |
|
if single_value > previous_avg: |
|
labels = np.append(labels, 1) |
|
else: |
|
labels = np.append(labels, -1) |
|
|
|
# calculate the new filtered element using the influence factor |
|
filtered_y = np.append(filtered_y, influence * single_value |
|
+ (1 - influence) * filtered_y[-1]) |
|
else: |
|
labels = np.append(labels, 0) |
|
filtered_y = np.append(filtered_y, single_value) |
|
|
|
# update avg as sum of the previuos avg + the lag * (the new calculated item - calculated item at position (i - lag)) |
|
current_avg_filter = previous_avg + 1. / lag * (filtered_y[-1] |
|
- filtered_y[len(filtered_y) - lag - 1]) |
|
|
|
# update variance as the previuos element variance + 1 / lag * new recalculated item - the previous avg - |
|
current_var_filter = previous_var + 1. / lag * ((filtered_y[-1] |
|
- previous_avg) ** 2 - (filtered_y[len(filtered_y) - 1 |
|
- lag] - previous_avg) ** 2 - (filtered_y[-1] |
|
- filtered_y[len(filtered_y) - 1 - lag]) ** 2 / lag) # the recalculated element at pos (lag) - avg of the previuos - new recalculated element - recalculated element at lag pos .... |
|
|
|
# calculate standard deviation for current element as sqrt (current variance) |
|
current_std_filter = np.sqrt(current_var_filter) |
|
|
|
return dict(avg=current_avg_filter, |
|
var=current_var_filter, |
|
std=current_std_filter, |
|
filtered_y=filtered_y[1:], |
|
labels=labels) |
|
|
|
lag = 30 |
|
threshold = 5 |
|
influence = 0 |
|
|
|
y = np.array([1,1,1.1,1,0.9,1,1,1.1,1,0.9,1,1.1,1,1,0.9,1,1,1.1,1,1,1,1,1.1,0.9,1,1.1,1,1,0.9, |
|
1,1.1,1,1,1.1,1,0.8,0.9,1,1.2,0.9,1,1,1.1,1.2,1,1.5,1,3,2,5,3,2,1,1,1,0.9,1,1,3, |
|
2.6,4,3,3.2,2,1,1,0.8,4,4,2,2.5,1,1,1]) |
|
|
|
# Run algo with settings from above |
|
result = init(y[:lag], lag=lag, threshold=threshold, influence=influence) |
|
|
|
for i in y[lag:]: |
|
result = add(result, i, lag, threshold, influence) |
|
|
|
# Plot result |
|
pylab.subplot(211) |
|
pylab.plot(np.arange(1, len(y) + 1), y) |
|
pylab.subplot(212) |
|
pylab.step(np.arange(1, len(y) + 1), result['labels'], color='red', |
|
lw=2) |
|
pylab.ylim(-1.5, 1.5) |
|
pylab.show() |