Skip to content

Instantly share code, notes, and snippets.

@vsmelov
Created June 26, 2018 22:53
Show Gist options
  • Save vsmelov/1cf5ad170a06301eb89e53e2b4c8dea8 to your computer and use it in GitHub Desktop.
Save vsmelov/1cf5ad170a06301eb89e53e2b4c8dea8 to your computer and use it in GitHub Desktop.
from window_avg_std import window_avg_std
def peak_detection(window, threshold, influence):
""" Smoothed z-score algo
https://stackoverflow.com/questions/22583391/peak-signal-detection-in-realtime-timeseries-data/43512887#43512887 """
elem = yield # get first elem
prev_elem = elem # previous elem (influenced)
elem_index = 0
prev_avg = elem
prev_std = 0
stat = window_avg_std(window)
next(stat) # roll generator to first yeild
while True:
# print(f'elem: {elem}, prev_avg: {prev_avg}, prev_std: {prev_std}, index: {elem_index}')
if abs(elem - prev_avg) > threshold * prev_std and \
elem_index >= window-1: # detection starts after first window'th elements
signal = 1 if elem > prev_avg else -1
elem = influence * elem + (1-influence) * prev_elem # smooth peak
else:
signal = 0
prev_avg, prev_std = stat.send(elem)
prev_elem = elem
elem_index += 1
elem = yield signal
if __name__ == '__main__':
data = [1, 1, 1.1, 1, 0.9, 1, 1, 1.1, 1, 0.9, 1, 1.1, 1, 1, 0.9, 1, 1, 1.1, 1, 1, 1, 1, 1.1, 0.9, 1, 1.1, 1, 1, 0.9,
1, 1.1, 1, 1, 1.1, 1, 0.8, 0.9, 1, 1.2, 0.9, 1, 1, 1.1, 1.2, 1, 1.5, 1, 3, 2, 5, 3, 2, 1, 1, 1, 0.9, 1, 1, 3,
2.6, 4, 3, 3.2, 2, 1, 1, 0.8, 4, 4, 2, 2.5, 1, 1, 1]
detector = peak_detection(window=3, threshold=3, influence=0.2)
next(detector)
while data:
elem = data.pop()
signal = detector.send(elem)
print('elem: {}, signal: {}'.format(elem, signal))
from queue import Queue
def window_avg_std(window=3):
""" streaming calc avg and std """
buf = Queue()
elem_sum1 = 0
elem_sum2 = 0 # sum of squared
elem = yield
while True:
buf.put_nowait(elem)
elem_sum1 += elem
elem_sum2 += elem * elem
if buf.qsize() > window:
x = buf.get_nowait()
elem_sum1 -= x
elem_sum2 -= x * x
avg = elem_sum1 / buf.qsize()
std = (elem_sum2 - elem_sum1) / buf.qsize()
elem = yield avg, std
if __name__ == '__main__':
f = window_avg_std()
next(f)
for i in range(1, 7):
v = f.send(i)
print('i={} v={}'.format(i, v))
@vsmelov
Copy link
Author

vsmelov commented Jun 27, 2018

Need corrections for std!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment