Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
Python implementation of smoothed z-score algorithm from
#!/usr/bin/env python
# Implementation of algorithm from
import numpy as np
import pylab
def thresholding_algo(y, lag, threshold, influence):
signals = np.zeros(len(y))
filteredY = np.array(y)
avgFilter = [0]*len(y)
stdFilter = [0]*len(y)
avgFilter[lag - 1] = np.mean(y[0:lag])
stdFilter[lag - 1] = np.std(y[0:lag])
for i in range(lag, len(y) - 1):
if abs(y[i] - avgFilter[i-1]) > threshold * stdFilter [i-1]:
if y[i] > avgFilter[i-1]:
signals[i] = 1
signals[i] = -1
filteredY[i] = influence * y[i] + (1 - influence) * filteredY[i-1]
avgFilter[i] = np.mean(filteredY[(i-lag):i])
stdFilter[i] = np.std(filteredY[(i-lag):i])
signals[i] = 0
filteredY[i] = y[i]
avgFilter[i] = np.mean(filteredY[(i-lag):i])
stdFilter[i] = np.std(filteredY[(i-lag):i])
return dict(signals = np.asarray(signals),
avgFilter = np.asarray(avgFilter),
stdFilter = np.asarray(stdFilter))
# Data
y = np.array([1,1,1.1,1,0.9,1,1,1.1,1,0.9,1,1.1,1,1,0.9,1,1,1.1,1,1,1,1,1.1,0.9,1,1.1,1,1,0.9,
# Settings: lag = 30, threshold = 5, influence = 0
lag = 30
threshold = 5
influence = 0
# Run algo with settings from above
result = thresholding_algo(y, lag=lag, threshold=threshold, influence=influence)
# Plot result
pylab.plot(np.arange(1, len(y)+1), y)
pylab.plot(np.arange(1, len(y)+1),
result["avgFilter"], color="cyan", lw=2)
pylab.plot(np.arange(1, len(y)+1),
result["avgFilter"] + threshold * result["stdFilter"], color="green", lw=2)
pylab.plot(np.arange(1, len(y)+1),
result["avgFilter"] - threshold * result["stdFilter"], color="green", lw=2)
pylab.step(np.arange(1, len(y)+1), result["signals"], color="red", lw=2)
pylab.ylim(-1.5, 1.5)
Copy link

JamesKBowler commented Aug 24, 2017

Yes!!! very nice. Thankyou

Copy link

delica1 commented Apr 29, 2018

Any idea how to modify this so it works with a real time data stream? I mean without recalculating the entire series each time, but instead to only recalculate the last item passed to the function.

Copy link

matthewhegarty commented Jun 16, 2018

Thanks for posting this. I noticed a bug on line 13.

It should be:

for i in range(lag, len(y)):

(otherwise the last item in the y list never gets checked).

Copy link

Fux002 commented Oct 25, 2018

i think
np.std(y[0:lag], ddof=1)

you use the wrong degrees of freedom

Copy link

dlegor commented Mar 7, 2019

Thank you so much for your code!

With small changes, your code could be faster. I changed the list to arrays and used numba for optimization.
With large time series, your code takes approximately 35 s, with changes the code takes 1.5 s.

Check here:

Copy link

PeiLi-Sandman commented Jun 19, 2019

Thank you for this code,

I did some modifications for the estimation of avg and std.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment