Skip to content

Instantly share code, notes, and snippets.

@vsbabu
Last active May 5, 2020 04:53
Show Gist options
  • Save vsbabu/9f9bcfbbba12bc988cc71cb53d5c1418 to your computer and use it in GitHub Desktop.
Save vsbabu/9f9bcfbbba12bc988cc71cb53d5c1418 to your computer and use it in GitHub Desktop.
Anomaly detection on a daily timeseries; highlight the output with pandas html masks
#!/usr/bin/env python3
import pandas as pd
import numpy as np
from fbprophet import Prophet
import sys
import argparse
import datetime
today = datetime.date.today()
argparser = argparse.ArgumentParser(description="Reads dt,value without header from stdin and prints a weekly folded html with anomalies", add_help=True)
argparser.add_argument("--start_week_date", "-s", help="Start date; must be a Monday")
argparser.add_argument("--how_many_weeks", "-k", help="For how many weeks?")
argparser.add_argument("--time_format", "-t", help="Strftime pattern to parse (%y%m%d)")
argparser.add_argument("--output", "-o", help="Output HTML file")
argparser.add_argument("--positive_color", "-p", help="Color for anomaly above band (lightgreen)")
argparser.add_argument("--negative_color", "-n", help="Color for anomaly below band (orange)")
argparser.add_argument("--future_color", "-f", help="Color for future predictions (silver)")
args = argparser.parse_args()
if args.how_many_weeks:
args.how_many_weeks = int(args.how_many_weeks)
else:
args.how_many_weeks = 8
if not args.start_week_date:
args.start_week_date = today - datetime.timedelta(days=today.weekday(), weeks = args.how_many_weeks)
if not args.time_format:
args.time_format = "%y%m%d"
if not args.positive_color:
args.positive_color = "lightgreen"
if not args.negative_color:
args.negative_color = "orange"
if not args.future_color:
args.future_color = "silver"
dfi = pd.read_csv(sys.stdin, sep=",", header=None, names=["ds", "y"])
dates = pd.date_range(args.start_week_date, periods=((args.how_many_weeks+0) * 7))
remaining_days_in_week = dates[-1].dayofweek - dates[-1].today().dayofweek
todayf = today.strftime(args.time_format)
if todayf not in dfi.ds.values:
#if today's data is not there, let us predict it
remaining_days_in_week = remaining_days_in_week + 1
dfi["ds"] = pd.to_datetime(dfi.ds, format=args.time_format)
dfi = dfi.set_index("ds")
dfi = dfi.resample("D").asfreq().fillna(0)
dfi.v = dfi.y.astype(int)
dfi.reset_index(inplace=True)
dfi.columns = ["ds", "y"]
def fit_predict_model(dataframe, interval_width=0.99, changepoint_range=0.8,
future_periods=0):
# your data MAY HAVE weekly/monthly seasonality and holidays. Change accordingly
m = Prophet(
daily_seasonality=False,
yearly_seasonality=False,
weekly_seasonality=False,
seasonality_mode="additive",
interval_width=interval_width,
changepoint_range=changepoint_range,
)
m = m.fit(dataframe)
forecast = m.predict(dataframe)
forecast["fact"] = dataframe["y"].reset_index(drop=True)
fdataframe = m.make_future_dataframe(periods=future_periods)
forecast_f = m.predict(fdataframe)
forecast_f["fact"] = forecast_f["yhat"].reset_index(drop=True)
forecast_f['fact'] = forecast_f.fact.astype(int)
return (forecast, forecast_f)
def detect_anomalies(forecast):
forecasted = forecast[
["ds", "trend", "yhat", "yhat_lower", "yhat_upper", "fact"]
].copy()
#add an anomaly value of 0 for everything
#make it 1 or -1 for those facts above or below yhat bands
forecasted["anomaly"] = 0
forecasted.loc[forecasted["fact"] > forecasted["yhat_upper"], "anomaly"] = 1
forecasted.loc[forecasted["fact"] < forecasted["yhat_lower"], "anomaly"] = -1
# anomaly importances
forecasted["importance"] = 0
forecasted.loc[forecasted["anomaly"] == 1, "importance"] = (
forecasted["fact"] - forecasted["yhat_upper"]
) / forecast["fact"]
forecasted.loc[forecasted["anomaly"] == -1, "importance"] = (
forecasted["yhat_lower"] - forecasted["fact"]
) / forecast["fact"]
return forecasted
def highlight_anomaly_wk(d):
p_anom = f"background-color: {args.positive_color}"
n_anom = f"background-color: {args.negative_color}"
f_anom = f"background-color: {args.future_color}"
regular = ""
df1 = pd.DataFrame(regular, index=d.index, columns=d.columns)
for wd in "mon tue wed thu fri sat sun".split():
a = "a_" + wd
p_mask = d[a] > 0
n_mask = d[a] == -1
f_mask = d[a] < -1
df1.loc[p_mask, wd] = p_anom
df1.loc[n_mask, wd] = n_anom
df1.loc[f_mask, wd] = f_anom
return df1
(pred, fut) = fit_predict_model(dfi, 0.7, 0.3, future_periods = remaining_days_in_week)
pred = detect_anomalies(pred)
fut['anomaly'] = -2
dfi.set_index("ds", inplace=True)
dfd = dfi.loc[dates]
edf = pred[["ds", "fact", "anomaly"]].copy()
edfut = fut[["ds", "fact", "anomaly"]].copy()
edf = edf.combine_first(edfut)
edf.set_index("ds", inplace=True)
edf=edf.reindex(dates)
edf['fact'] = edf.fact.astype(int)
# Now, fold the df by weeks. fact goes to a day value and anomaly goes to a_day value
wkv = edf[["fact", "anomaly"]].copy().values.reshape(args.how_many_weeks+0, 7 * 2)
efw = edf[edf.index.weekday_name == "Monday"]
efw = pd.DataFrame(
wkv,
index=efw.index,
columns=[
"mon",
"a_mon",
"tue",
"a_tue",
"wed",
"a_wed",
"thu",
"a_thu",
"fri",
"a_fri",
"sat",
"a_sat",
"sun",
"a_sun",
],
)
efw.index.name = "week"
efw.sort_index(ascending=False, inplace=True)
html = efw.style.apply(highlight_anomaly_wk, axis=None).hide_columns(
["a_mon", "a_tue", "a_wed", "a_thu", "a_fri", "a_sat", "a_sun"]
)
if args.output:
with open(args.output, "w") as f:
f.write(html.render())
else:
print(html.render())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment