Skip to content

Instantly share code, notes, and snippets.

@krinkere
Created January 18, 2018 14:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save krinkere/f1bac3beb99da3794632108adfb58289 to your computer and use it in GitHub Desktop.
Save krinkere/f1bac3beb99da3794632108adfb58289 to your computer and use it in GitHub Desktop.
from __future__ import division
from itertools import izip, count
import matplotlib.pyplot as plt
from numpy import linspace, loadtxt, ones, convolve
import numpy as np
import pandas as pd
import collections
from random import randint
from matplotlib import style
style.use('fivethirtyeight')
%matplotlib inline
# 1. Download sunspot dataset and upload the same to dataset directory
# Load the sunspot dataset as an Array
# !mkdir -p dataset
# !wget -c -b http://www-personal.umich.edu/~mejn/cp/data/sunspots.txt -P dataset
data = loadtxt("dataset/sunspots.txt", float)
# 2. View the data as a table
data_as_frame = pd.DataFrame(data, columns=['Months', 'SunSpots'])
data_as_frame.head()
# 3. Lets define some use-case specific UDF(User Defined Functions)
def moving_average(data, window_size):
""" Computes moving average using discrete linear convolution of two one dimensional sequences.
Args:
-----
data (pandas.Series): independent variable
window_size (int): rolling window size
Returns:
--------
ndarray of linear convolution
References:
------------
[1] Wikipedia, "Convolution", http://en.wikipedia.org/wiki/Convolution.
[2] API Reference: https://docs.scipy.org/doc/numpy/reference/generated/numpy.convolve.html
"""
window = np.ones(int(window_size))/float(window_size)
return np.convolve(data, window, 'same')
def explain_anomalies(y, window_size, sigma=1.0):
""" Helps in exploring the anamolies using stationary standard deviation
Args:
-----
y (pandas.Series): independent variable
window_size (int): rolling window size
sigma (int): value for standard deviation
Returns:
--------
a dict (dict of 'standard_deviation': int, 'anomalies_dict': (index: value))
containing information about the points indentified as anomalies
"""
avg = moving_average(y, window_size).tolist()
residual = y - avg
# Calculate the variation in the distribution of the residual
std = np.std(residual)
return {'standard_deviation': round(std, 3),
'anomalies_dict': collections.OrderedDict([(index, y_i) for
index, y_i, avg_i in izip(count(), y, avg)
if (y_i > avg_i + (sigma*std)) | (y_i < avg_i - (sigma*std))])}
def explain_anomalies_rolling_std(y, window_size, sigma=1.0):
""" Helps in exploring the anamolies using rolling standard deviation
Args:
-----
y (pandas.Series): independent variable
window_size (int): rolling window size
sigma (int): value for standard deviation
Returns:
--------
a dict (dict of 'standard_deviation': int, 'anomalies_dict': (index: value))
containing information about the points indentified as anomalies
"""
avg = moving_average(y, window_size)
avg_list = avg.tolist()
residual = y - avg
# Calculate the variation in the distribution of the residual
testing_std = pd.rolling_std(residual, window_size)
testing_std_as_df = pd.DataFrame(testing_std)
rolling_std = testing_std_as_df.replace(np.nan,
testing_std_as_df.ix[window_size - 1]).round(3).iloc[:,0].tolist()
std = np.std(residual)
return {'stationary standard_deviation': round(std, 3),
'anomalies_dict': collections.OrderedDict([(index, y_i)
for index, y_i, avg_i, rs_i in izip(count(),
y, avg_list, rolling_std)
if (y_i > avg_i + (sigma * rs_i)) | (y_i < avg_i - (sigma * rs_i))])}
# This function is repsonsible for displaying how the function performs on the given dataset.
def plot_results(x, y, window_size, sigma_value=1,
text_xlabel="X Axis", text_ylabel="Y Axis", applying_rolling_std=False):
""" Helps in generating the plot and flagging the anamolies.
Supports both moving and stationary standard deviation. Use the 'applying_rolling_std' to switch
between the two.
Args:
-----
x (pandas.Series): dependent variable
y (pandas.Series): independent variable
window_size (int): rolling window size
sigma_value (int): value for standard deviation
text_xlabel (str): label for annotating the X Axis
text_ylabel (str): label for annotatin the Y Axis
applying_rolling_std (boolean): True/False for using rolling vs stationary standard deviation
"""
plt.figure(figsize=(15, 8))
plt.plot(x, y, "k.")
y_av = moving_average(y, window_size)
plt.plot(x, y_av, color='green')
plt.xlim(0, 1000)
plt.xlabel(text_xlabel)
plt.ylabel(text_ylabel)
# Query for the anomalies and plot the same
events = {}
if applying_rolling_std:
events = explain_anomalies_rolling_std(y, window_size=window_size, sigma=sigma_value)
else:
events = explain_anomalies(y, window_size=window_size, sigma=sigma_value)
x_anomaly = np.fromiter(events['anomalies_dict'].iterkeys(), dtype=int, count=len(events['anomalies_dict']))
y_anomaly = np.fromiter(events['anomalies_dict'].itervalues(), dtype=float,
count=len(events['anomalies_dict']))
plt.plot(x_anomaly, y_anomaly, "r*", markersize=12)
# add grid and lines and enable the plot
plt.grid(True)
plt.show()
# 4. Lets play with the functions
x = data_as_frame['Months']
Y = data_as_frame['SunSpots']
# plot the results
plot_results(x, y=Y, window_size=10, text_xlabel="Months", sigma_value=3,
text_ylabel="No. of Sun spots")
events = explain_anomalies(y, window_size=5, sigma=3)
# Display the anomaly dict
print("Information about the anomalies model:{}".format(events))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment