{{ message }}

Instantly share code, notes, and snippets.

# ximeg/ ThresholdingAlgo.py

Created Apr 20, 2017
Python implementation of smoothed z-score algorithm from http://stackoverflow.com/a/22640362/6029703
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters
 #!/usr/bin/env python # Implementation of algorithm from http://stackoverflow.com/a/22640362/6029703 import numpy as np import pylab def thresholding_algo(y, lag, threshold, influence): signals = np.zeros(len(y)) filteredY = np.array(y) avgFilter = *len(y) stdFilter = *len(y) avgFilter[lag - 1] = np.mean(y[0:lag]) stdFilter[lag - 1] = np.std(y[0:lag]) for i in range(lag, len(y) - 1): if abs(y[i] - avgFilter[i-1]) > threshold * stdFilter [i-1]: if y[i] > avgFilter[i-1]: signals[i] = 1 else: signals[i] = -1 filteredY[i] = influence * y[i] + (1 - influence) * filteredY[i-1] avgFilter[i] = np.mean(filteredY[(i-lag):i]) stdFilter[i] = np.std(filteredY[(i-lag):i]) else: signals[i] = 0 filteredY[i] = y[i] avgFilter[i] = np.mean(filteredY[(i-lag):i]) stdFilter[i] = np.std(filteredY[(i-lag):i]) return dict(signals = np.asarray(signals), avgFilter = np.asarray(avgFilter), stdFilter = np.asarray(stdFilter)) # Data y = np.array([1,1,1.1,1,0.9,1,1,1.1,1,0.9,1,1.1,1,1,0.9,1,1,1.1,1,1,1,1,1.1,0.9,1,1.1,1,1,0.9, 1,1.1,1,1,1.1,1,0.8,0.9,1,1.2,0.9,1,1,1.1,1.2,1,1.5,1,3,2,5,3,2,1,1,1,0.9,1,1,3, 2.6,4,3,3.2,2,1,1,0.8,4,4,2,2.5,1,1,1]) # Settings: lag = 30, threshold = 5, influence = 0 lag = 30 threshold = 5 influence = 0 # Run algo with settings from above result = thresholding_algo(y, lag=lag, threshold=threshold, influence=influence) # Plot result pylab.subplot(211) pylab.plot(np.arange(1, len(y)+1), y) pylab.plot(np.arange(1, len(y)+1), result["avgFilter"], color="cyan", lw=2) pylab.plot(np.arange(1, len(y)+1), result["avgFilter"] + threshold * result["stdFilter"], color="green", lw=2) pylab.plot(np.arange(1, len(y)+1), result["avgFilter"] - threshold * result["stdFilter"], color="green", lw=2) pylab.subplot(212) pylab.step(np.arange(1, len(y)+1), result["signals"], color="red", lw=2) pylab.ylim(-1.5, 1.5)

### JamesKBowler commented Aug 24, 2017

Yes!!! very nice. Thankyou

### delica1 commented Apr 29, 2018 • edited

Any idea how to modify this so it works with a real time data stream? I mean without recalculating the entire series each time, but instead to only recalculate the last item passed to the function.

### matthewhegarty commented Jun 16, 2018

Thanks for posting this. I noticed a bug on line 13.

It should be:

``````for i in range(lag, len(y)):
``````

(otherwise the last item in the `y` list never gets checked).

### Fux002 commented Oct 25, 2018

i think
np.std(y[0:lag], ddof=1)

you use the wrong degrees of freedom

### dlegor commented Mar 7, 2019

Thank you so much for your code!

With small changes, your code could be faster. I changed the list to arrays and used numba for optimization.
With large time series, your code takes approximately 35 s, with changes the code takes 1.5 s.

### PeiLi-Sandman commented Jun 19, 2019

Thank you for this code,

I did some modifications for the estimation of avg and std.