Skip to content

Instantly share code, notes, and snippets.

@MatteoBattilana
Last active March 11, 2022 10:09
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save MatteoBattilana/95795e68129cda8f10f91d96dc4ebe72 to your computer and use it in GitHub Desktop.
Save MatteoBattilana/95795e68129cda8f10f91d96dc4ebe72 to your computer and use it in GitHub Desktop.
Robust peak detection algorithm (using z-scores), modified to split up the initialization and the computational part.

This is a Python implementation of the Robust peak detection algorithm.

The initialization and the computational parts are split up, only the filtered_y array has been kept, that has a maximum size equals to lag, so there is no memory increase. (Results are the same of above answers). In order to plot the graph, also the labels array is kept.

#!/usr/bin/python
# -*- coding: utf-8 -*-
import numpy as np
import pylab
def init(x, lag, threshold, influence):
'''
Smoothed z-score algorithm
Implementation of algorithm from https://stackoverflow.com/a/22640362/6029703
'''
labels = np.zeros(lag)
filtered_y = np.array(x[0:lag])
avg_filter = np.zeros(lag)
std_filter = np.zeros(lag)
var_filter = np.zeros(lag)
avg_filter[lag - 1] = np.mean(x[0:lag])
std_filter[lag - 1] = np.std(x[0:lag])
var_filter[lag - 1] = np.var(x[0:lag])
return dict(avg=avg_filter[lag - 1],
var=var_filter[lag - 1],
std=std_filter[lag - 1],
filtered_y=filtered_y,
labels=labels)
def add(esult, single_value, lag, threshold, influence):
previous_avg = result['avg']
previous_var = result['var']
previous_std = result['std']
filtered_y = result['filtered_y']
labels = result['labels']
if abs(single_value - previous_avg) > threshold * previous_std:
if single_value > previous_avg:
labels = np.append(labels, 1)
else:
labels = np.append(labels, -1)
# calculate the new filtered element using the influence factor
filtered_y = np.append(filtered_y, influence * single_value
+ (1 - influence) * filtered_y[-1])
else:
labels = np.append(labels, 0)
filtered_y = np.append(filtered_y, single_value)
# update avg as sum of the previuos avg + the lag * (the new calculated item - calculated item at position (i - lag))
current_avg_filter = previous_avg + 1. / lag * (filtered_y[-1]
- filtered_y[len(filtered_y) - lag - 1])
# update variance as the previuos element variance + 1 / lag * new recalculated item - the previous avg -
current_var_filter = previous_var + 1. / lag * ((filtered_y[-1]
- previous_avg) ** 2 - (filtered_y[len(filtered_y) - 1
- lag] - previous_avg) ** 2 - (filtered_y[-1]
- filtered_y[len(filtered_y) - 1 - lag]) ** 2 / lag) # the recalculated element at pos (lag) - avg of the previuos - new recalculated element - recalculated element at lag pos ....
# calculate standard deviation for current element as sqrt (current variance)
current_std_filter = np.sqrt(current_var_filter)
return dict(avg=current_avg_filter,
var=current_var_filter,
std=current_std_filter,
filtered_y=filtered_y[1:],
labels=labels)
lag = 30
threshold = 5
influence = 0
y = np.array([1,1,1.1,1,0.9,1,1,1.1,1,0.9,1,1.1,1,1,0.9,1,1,1.1,1,1,1,1,1.1,0.9,1,1.1,1,1,0.9,
1,1.1,1,1,1.1,1,0.8,0.9,1,1.2,0.9,1,1,1.1,1.2,1,1.5,1,3,2,5,3,2,1,1,1,0.9,1,1,3,
2.6,4,3,3.2,2,1,1,0.8,4,4,2,2.5,1,1,1])
# Run algo with settings from above
result = init(y[:lag], lag=lag, threshold=threshold, influence=influence)
for i in y[lag:]:
result = add(result, i, lag, threshold, influence)
# Plot result
pylab.subplot(211)
pylab.plot(np.arange(1, len(y) + 1), y)
pylab.subplot(212)
pylab.step(np.arange(1, len(y) + 1), result['labels'], color='red',
lw=2)
pylab.ylim(-1.5, 1.5)
pylab.show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment