Last active
April 4, 2019 12:05
-
-
Save spikar/baf14947fbd7893beab8f1ee93259100 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import division | |
from itertools import count | |
import matplotlib | |
matplotlib.use('TkAgg') | |
import matplotlib.pyplot as plt | |
import numpy as np | |
import pandas as pd | |
import collections | |
from matplotlib import style | |
style.use('fivethirtyeight') | |
# 3. Lets define some use-case specific UDF(User Defined Functions) | |
def moving_average(data, window_size): | |
""" Computes moving average using discrete linear convolution of two one dimensional sequences. | |
Args: | |
----- | |
data (pandas.Series): independent variable | |
window_size (int): rolling window size | |
Returns: | |
-------- | |
ndarray of linear convolution | |
References: | |
------------ | |
[1] Wikipedia, "Convolution", http://en.wikipedia.org/wiki/Convolution. | |
[2] API Reference: https://docs.scipy.org/doc/numpy/reference/generated/numpy.convolve.html | |
""" | |
window = np.ones(int(window_size))/float(window_size) | |
return np.convolve(data, window, 'same') | |
def explain_anomalies(y, window_size, sigma=1.0): | |
""" Helps in exploring the anamolies using stationary standard deviation | |
Args: | |
----- | |
y (pandas.Series): independent variable | |
window_size (int): rolling window size | |
sigma (int): value for standard deviation | |
Returns: | |
-------- | |
a dict (dict of 'standard_deviation': int, 'anomalies_dict': (index: value)) | |
containing information about the points indentified as anomalies | |
""" | |
avg = moving_average(y, window_size).tolist() | |
residual = y - avg | |
# Calculate the variation in the distribution of the residual | |
std = np.std(residual) | |
return {'standard_deviation': round(std, 3), | |
'anomalies_dict': collections.OrderedDict([(index, y_i) for | |
index, y_i, avg_i in zip(count(), y, avg) | |
if (y_i > avg_i + (sigma*std)) | (y_i < avg_i - (sigma*std))])} | |
def explain_anomalies_rolling_std(y, window_size, sigma=1.0): | |
""" Helps in exploring the anamolies using rolling standard deviation | |
Args: | |
----- | |
y (pandas.Series): independent variable | |
window_size (int): rolling window size | |
sigma (int): value for standard deviation | |
Returns: | |
-------- | |
a dict (dict of 'standard_deviation': int, 'anomalies_dict': (index: value)) | |
containing information about the points indentified as anomalies | |
""" | |
avg = moving_average(y, window_size) | |
avg_list = avg.tolist() | |
residual = y - avg | |
# Calculate the variation in the distribution of the residual | |
testing_std = residual.rolling(window_size).std() | |
testing_std_as_df = pd.DataFrame(testing_std) | |
rolling_std = testing_std_as_df.replace(np.nan, | |
testing_std_as_df.ix[window_size - 1]).round(3).iloc[:,0].tolist() | |
std = np.std(residual) | |
return {'stationary standard_deviation': round(std, 3), | |
'anomalies_dict': collections.OrderedDict([(index, y_i) | |
for index, y_i, avg_i, rs_i in zip(count(), | |
y, avg_list, rolling_std) | |
if (y_i > avg_i + (sigma * rs_i)) | (y_i < avg_i - (sigma * rs_i))])} | |
# This function is repsonsible for displaying how the function performs on the given dataset. | |
def plot_results(x, y, window_size, sigma_value=1, | |
text_xlabel="X Axis", text_ylabel="Y Axis", applying_rolling_std=False): | |
""" Helps in generating the plot and flagging the anamolies. | |
Supports both moving and stationary standard deviation. Use the 'applying_rolling_std' to switch | |
between the two. | |
Args: | |
----- | |
x (pandas.Series): dependent variable | |
y (pandas.Series): independent variable | |
window_size (int): rolling window size | |
sigma_value (int): value for standard deviation | |
text_xlabel (str): label for annotating the X Axis | |
text_ylabel (str): label for annotatin the Y Axis | |
applying_rolling_std (boolean): True/False for using rolling vs stationary standard deviation | |
""" | |
plt.figure(figsize=(15, 8)) | |
plt.plot(x, y, "k.", markersize = 3) | |
y_av = moving_average(y, window_size) | |
plt.plot(x, y_av, color='green', linewidth = 2.5) | |
#plt.xlim(0, 1000) | |
plt.xlabel(text_xlabel) | |
plt.ylabel(text_ylabel) | |
# Query for the anomalies and plot the same | |
events = {} | |
if applying_rolling_std: | |
events = explain_anomalies_rolling_std(y, window_size=window_size, sigma=sigma_value) | |
else: | |
events = explain_anomalies(y, window_size=window_size, sigma=sigma_value) | |
x_anomaly = np.fromiter(events['anomalies_dict'].keys(), dtype=int, count=len(events['anomalies_dict'])) | |
y_anomaly = np.fromiter(events['anomalies_dict'].values(), dtype=float, | |
count=len(events['anomalies_dict'])) | |
plt.plot(x_anomaly, y_anomaly, "ro", markersize=6) | |
# add grid and lines and enable the plot | |
plt.grid(True) | |
plt.show() | |
# This function is repsonsible for displaying how the function performs on the given dataset. | |
def plot_results_1(x, y, window_size, sigma_value=1, | |
text_xlabel="X Axis", text_ylabel="Y Axis", applying_rolling_std=False): | |
""" Helps in generating the plot and flagging the anamolies. | |
Supports both moving and stationary standard deviation. Use the 'applying_rolling_std' to switch | |
between the two. | |
Args: | |
----- | |
x (pandas.Series): dependent variable | |
y (pandas.Series): independent variable | |
window_size (int): rolling window size | |
sigma_value (int): value for standard deviation | |
text_xlabel (str): label for annotating the X Axis | |
text_ylabel (str): label for annotatin the Y Axis | |
applying_rolling_std (boolean): True/False for using rolling vs stationary standard deviation | |
""" | |
plt.figure(figsize=(15, 8)) | |
plt.plot(x, y, "k.", markersize = 5) | |
y_av = moving_average(y, window_size) | |
plt.plot(x, y_av, color='green') | |
#plt.xlim(0, 1000) | |
plt.xlabel(text_xlabel) | |
plt.ylabel(text_ylabel) | |
# 4. Lets play with the functions | |
data_as_frame = pd.read_csv(r'C:\Codes\Medium\acceleration_data.csv', index_col= False) | |
x = data_as_frame['time'] | |
Y = data_as_frame['acceleration'] | |
# plot the results | |
plot_results(x=x, y=Y, window_size=20, sigma_value=3, text_xlabel="time (s)", | |
text_ylabel="acceleration (m/s)") | |
events = explain_anomalies(Y, window_size=20, sigma=3) | |
# Display the anomaly dict | |
print("Information about the anomalies model:{}".format(events)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment