Skip to content

Instantly share code, notes, and snippets.

@ximeg
Created April 20, 2017 07:20
Show Gist options
  • Star 46 You must be signed in to star a gist
  • Fork 14 You must be signed in to fork a gist
  • Save ximeg/587011a65d05f067a29ce9c22894d1d2 to your computer and use it in GitHub Desktop.
Save ximeg/587011a65d05f067a29ce9c22894d1d2 to your computer and use it in GitHub Desktop.
Python implementation of smoothed z-score algorithm from http://stackoverflow.com/a/22640362/6029703
#!/usr/bin/env python
# Implementation of algorithm from http://stackoverflow.com/a/22640362/6029703
import numpy as np
import pylab
def thresholding_algo(y, lag, threshold, influence):
signals = np.zeros(len(y))
filteredY = np.array(y)
avgFilter = [0]*len(y)
stdFilter = [0]*len(y)
avgFilter[lag - 1] = np.mean(y[0:lag])
stdFilter[lag - 1] = np.std(y[0:lag])
for i in range(lag, len(y) - 1):
if abs(y[i] - avgFilter[i-1]) > threshold * stdFilter [i-1]:
if y[i] > avgFilter[i-1]:
signals[i] = 1
else:
signals[i] = -1
filteredY[i] = influence * y[i] + (1 - influence) * filteredY[i-1]
avgFilter[i] = np.mean(filteredY[(i-lag):i])
stdFilter[i] = np.std(filteredY[(i-lag):i])
else:
signals[i] = 0
filteredY[i] = y[i]
avgFilter[i] = np.mean(filteredY[(i-lag):i])
stdFilter[i] = np.std(filteredY[(i-lag):i])
return dict(signals = np.asarray(signals),
avgFilter = np.asarray(avgFilter),
stdFilter = np.asarray(stdFilter))
# Data
y = np.array([1,1,1.1,1,0.9,1,1,1.1,1,0.9,1,1.1,1,1,0.9,1,1,1.1,1,1,1,1,1.1,0.9,1,1.1,1,1,0.9,
1,1.1,1,1,1.1,1,0.8,0.9,1,1.2,0.9,1,1,1.1,1.2,1,1.5,1,3,2,5,3,2,1,1,1,0.9,1,1,3,
2.6,4,3,3.2,2,1,1,0.8,4,4,2,2.5,1,1,1])
# Settings: lag = 30, threshold = 5, influence = 0
lag = 30
threshold = 5
influence = 0
# Run algo with settings from above
result = thresholding_algo(y, lag=lag, threshold=threshold, influence=influence)
# Plot result
pylab.subplot(211)
pylab.plot(np.arange(1, len(y)+1), y)
pylab.plot(np.arange(1, len(y)+1),
result["avgFilter"], color="cyan", lw=2)
pylab.plot(np.arange(1, len(y)+1),
result["avgFilter"] + threshold * result["stdFilter"], color="green", lw=2)
pylab.plot(np.arange(1, len(y)+1),
result["avgFilter"] - threshold * result["stdFilter"], color="green", lw=2)
pylab.subplot(212)
pylab.step(np.arange(1, len(y)+1), result["signals"], color="red", lw=2)
pylab.ylim(-1.5, 1.5)
@delica1
Copy link

delica1 commented Apr 29, 2018

Any idea how to modify this so it works with a real time data stream? I mean without recalculating the entire series each time, but instead to only recalculate the last item passed to the function.

@matthewhegarty
Copy link

Thanks for posting this. I noticed a bug on line 13.

It should be:

for i in range(lag, len(y)):

(otherwise the last item in the y list never gets checked).

@Fux002
Copy link

Fux002 commented Oct 25, 2018

i think
np.std(y[0:lag], ddof=1)

you use the wrong degrees of freedom

@dlegor
Copy link

dlegor commented Mar 7, 2019

Thank you so much for your code!

With small changes, your code could be faster. I changed the list to arrays and used numba for optimization.
With large time series, your code takes approximately 35 s, with changes the code takes 1.5 s.

Check here:
https://gist.github.com/dlegor/286dd5cd5ecc317c34ed81aaed4a2b37

@PeiLi-Sandman
Copy link

Thank you for this code,

I did some modifications for the estimation of avg and std.

https://gist.github.com/PeiLi-Sandman/d67baad835a516481e6ca124a08840f2

@traderblakeq
Copy link

traderblakeq commented Nov 13, 2022

Very nice - Im trying to implement it with a Dataframe containing a pandas Series of Close prices.

df.Close.to_numpy() --> seems to work fine with thresholding_algo

but It seems it have issue with the plotting part, any tips on how to that part working ?

image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment