Created
January 18, 2018 14:48
-
-
Save krinkere/f1bac3beb99da3794632108adfb58289 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import division | |
from itertools import izip, count | |
import matplotlib.pyplot as plt | |
from numpy import linspace, loadtxt, ones, convolve | |
import numpy as np | |
import pandas as pd | |
import collections | |
from random import randint | |
from matplotlib import style | |
style.use('fivethirtyeight') | |
%matplotlib inline | |
# 1. Download sunspot dataset and upload the same to dataset directory | |
# Load the sunspot dataset as an Array | |
# !mkdir -p dataset | |
# !wget -c -b http://www-personal.umich.edu/~mejn/cp/data/sunspots.txt -P dataset | |
data = loadtxt("dataset/sunspots.txt", float) | |
# 2. View the data as a table | |
data_as_frame = pd.DataFrame(data, columns=['Months', 'SunSpots']) | |
data_as_frame.head() | |
# 3. Lets define some use-case specific UDF(User Defined Functions) | |
def moving_average(data, window_size): | |
""" Computes moving average using discrete linear convolution of two one dimensional sequences. | |
Args: | |
----- | |
data (pandas.Series): independent variable | |
window_size (int): rolling window size | |
Returns: | |
-------- | |
ndarray of linear convolution | |
References: | |
------------ | |
[1] Wikipedia, "Convolution", http://en.wikipedia.org/wiki/Convolution. | |
[2] API Reference: https://docs.scipy.org/doc/numpy/reference/generated/numpy.convolve.html | |
""" | |
window = np.ones(int(window_size))/float(window_size) | |
return np.convolve(data, window, 'same') | |
def explain_anomalies(y, window_size, sigma=1.0): | |
""" Helps in exploring the anamolies using stationary standard deviation | |
Args: | |
----- | |
y (pandas.Series): independent variable | |
window_size (int): rolling window size | |
sigma (int): value for standard deviation | |
Returns: | |
-------- | |
a dict (dict of 'standard_deviation': int, 'anomalies_dict': (index: value)) | |
containing information about the points indentified as anomalies | |
""" | |
avg = moving_average(y, window_size).tolist() | |
residual = y - avg | |
# Calculate the variation in the distribution of the residual | |
std = np.std(residual) | |
return {'standard_deviation': round(std, 3), | |
'anomalies_dict': collections.OrderedDict([(index, y_i) for | |
index, y_i, avg_i in izip(count(), y, avg) | |
if (y_i > avg_i + (sigma*std)) | (y_i < avg_i - (sigma*std))])} | |
def explain_anomalies_rolling_std(y, window_size, sigma=1.0): | |
""" Helps in exploring the anamolies using rolling standard deviation | |
Args: | |
----- | |
y (pandas.Series): independent variable | |
window_size (int): rolling window size | |
sigma (int): value for standard deviation | |
Returns: | |
-------- | |
a dict (dict of 'standard_deviation': int, 'anomalies_dict': (index: value)) | |
containing information about the points indentified as anomalies | |
""" | |
avg = moving_average(y, window_size) | |
avg_list = avg.tolist() | |
residual = y - avg | |
# Calculate the variation in the distribution of the residual | |
testing_std = pd.rolling_std(residual, window_size) | |
testing_std_as_df = pd.DataFrame(testing_std) | |
rolling_std = testing_std_as_df.replace(np.nan, | |
testing_std_as_df.ix[window_size - 1]).round(3).iloc[:,0].tolist() | |
std = np.std(residual) | |
return {'stationary standard_deviation': round(std, 3), | |
'anomalies_dict': collections.OrderedDict([(index, y_i) | |
for index, y_i, avg_i, rs_i in izip(count(), | |
y, avg_list, rolling_std) | |
if (y_i > avg_i + (sigma * rs_i)) | (y_i < avg_i - (sigma * rs_i))])} | |
# This function is repsonsible for displaying how the function performs on the given dataset. | |
def plot_results(x, y, window_size, sigma_value=1, | |
text_xlabel="X Axis", text_ylabel="Y Axis", applying_rolling_std=False): | |
""" Helps in generating the plot and flagging the anamolies. | |
Supports both moving and stationary standard deviation. Use the 'applying_rolling_std' to switch | |
between the two. | |
Args: | |
----- | |
x (pandas.Series): dependent variable | |
y (pandas.Series): independent variable | |
window_size (int): rolling window size | |
sigma_value (int): value for standard deviation | |
text_xlabel (str): label for annotating the X Axis | |
text_ylabel (str): label for annotatin the Y Axis | |
applying_rolling_std (boolean): True/False for using rolling vs stationary standard deviation | |
""" | |
plt.figure(figsize=(15, 8)) | |
plt.plot(x, y, "k.") | |
y_av = moving_average(y, window_size) | |
plt.plot(x, y_av, color='green') | |
plt.xlim(0, 1000) | |
plt.xlabel(text_xlabel) | |
plt.ylabel(text_ylabel) | |
# Query for the anomalies and plot the same | |
events = {} | |
if applying_rolling_std: | |
events = explain_anomalies_rolling_std(y, window_size=window_size, sigma=sigma_value) | |
else: | |
events = explain_anomalies(y, window_size=window_size, sigma=sigma_value) | |
x_anomaly = np.fromiter(events['anomalies_dict'].iterkeys(), dtype=int, count=len(events['anomalies_dict'])) | |
y_anomaly = np.fromiter(events['anomalies_dict'].itervalues(), dtype=float, | |
count=len(events['anomalies_dict'])) | |
plt.plot(x_anomaly, y_anomaly, "r*", markersize=12) | |
# add grid and lines and enable the plot | |
plt.grid(True) | |
plt.show() | |
# 4. Lets play with the functions | |
x = data_as_frame['Months'] | |
Y = data_as_frame['SunSpots'] | |
# plot the results | |
plot_results(x, y=Y, window_size=10, text_xlabel="Months", sigma_value=3, | |
text_ylabel="No. of Sun spots") | |
events = explain_anomalies(y, window_size=5, sigma=3) | |
# Display the anomaly dict | |
print("Information about the anomalies model:{}".format(events)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment