Skip to content

Instantly share code, notes, and snippets.

@welch
Last active August 29, 2015 14:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save welch/3f7b696beab6ba1b55bb to your computer and use it in GitHub Desktop.
Save welch/3f7b696beab6ba1b55bb to your computer and use it in GitHub Desktop.
testbed for juttle forecasting

Streaming statistics, forecasting, and anomaly detection in Juttle

// anomaly detection based on Median Absolute Deviation estimates of
// standard deviation (robust to outliers and non-normal data)
//
import 'https://gist.githubusercontent.com/welch/3f7b696beab6ba1b55bb/raw/stdj.juttle' as stdj;
import 'https://gist.githubusercontent.com/welch/3f7b696beab6ba1b55bb/raw/math.juttle' as math;
import 'https://gist.githubusercontent.com/welch/3f7b696beab6ba1b55bb/raw/ts.juttle' as ts;
import 'https://gist.githubusercontent.com/welch/3f7b696beab6ba1b55bb/raw/mav.juttle' as mav;
import 'https://gist.githubusercontent.com/welch/3f7b696beab6ba1b55bb/raw/seasonal.juttle' as seasonal;
import 'https://gist.githubusercontent.com/welch/3f7b696beab6ba1b55bb/raw/forecast.juttle' as forecast;
export sub normalize_daily(in, out) {
seasonal.squash_daily -in in -out '_normalize_squashed'
| mav.normalize -over :3d: -in '_normalize_squashed' -out out
| remove _normalize_squashed
}
export sub normalize_daily_by(in, out, by) {
seasonal.squash_daily_by -in in -out '_normalize_squashed' -by by
| mav.normalize_by -over :3d: -in '_normalize_squashed' -out out -by by
| remove _normalize_squashed
}
export sub normalize_forecast_daily(in, out) {
seasonal.squash_daily -in in -out '_normalize_squashed'
| forecast.forecast_err -in '_normalize_squashed' -out '_normalize_err'
| mav.normalize -over :3d: -in '_normalize_err' -out out
| remove _normalize_squashed, _normalize_err
}
export sub normalize_forecast_daily_by(in, out, by) {
seasonal.squash_daily_by -in in -out '_normalize_squashed' -by by
| forecast.forecast_err_by -in '_normalize_squashed' -out '_normalize_err' -by by
| mav.normalize_by -over :3d: -in '_normalize_err' -out out -by by
| remove _normalize_squashed, _normalize_err
}
export sub outlier_daily(in, out, after, sigma) {
normalize_daily -in in -out out
| put *out = (time > after && Math.abs(*out) > sigma) ? Math.abs(*out) : 0
}
export sub outlier_daily_by(in, out, after, sigma, by) {
normalize_daily_by -in in -out out -by by
| put *out = (time > after && Math.abs(*out) > sigma) ? Math.abs(*out) : 0
}
export sub outlier_forecast_daily(in, out, after, sigma) {
normalize_forecast_daily -in in -out out
| put *out = (time > after && Math.abs(*out) > sigma) ? Math.abs(*out) : 0
}
export sub outlier_forecast_daily_by(in, out, after, sigma, by) {
normalize_forecast_daily_by -in in -out out -by by
| put *out = (time > after && Math.abs(*out) > sigma) ? Math.abs(*out) : 0
}
export sub normalize_weekly(in, out) {
seasonal.squash_weekly -in in -out '_normalize_squashed'
| mav.normalize -over :3w: -in '_normalize_squashed' -out out
| remove _normalize_squashed
}
export sub normalize_forecast_weekly(in, out) {
seasonal.squash_weekly -in in -out '_normalize_squashed'
| forecast.forecast_err -in '_normalize_squashed' -out '_normalize_err'
| mav.normalize -over :3w: -in '_normalize_err' -out out
| remove _normalize_squashed, _normalize_err
}
export sub normalize_weekly_by(in, out, by) {
seasonal.squash_weekly_by -in in -out '_normalize_squashed' -by by
| mav.normalize_by -over :3w: -in '_normalize_squashed' -out out -by by
| remove _normalize_squashed
}
export sub outlier_weekly(in, out, after, sigma) {
normalize_weekly -in in -out out
| put *out = (time > after && Math.abs(*out) > sigma) ? Math.abs(*out) : 0
}
export sub outlier_weekly_by(in, out, after, sigma, by) {
normalize_weekly_by -in in -out out -by by
| put *out = (time > after && Math.abs(*out) > sigma) ? Math.abs(*out) : 0
}
//
// convert the output points of, eg, outlier_daily into an event
// stream to be overlayed on a timechart having the specified title. a
// nonzero input value indicates an event condition, and the maximum
// nonzero input value/time in each interval will be selected for reporting.
// successive nonzero intervals will be suppressed as belonging to the same
// inital event.
//
export sub to_events(in, every, after, title) {
(
// mark the end of training window
put _training_data = (time < after)
| put _end_training = (!_training_data && stdj.previous(_training_data, true))
| filter _end_training == true
| remove _training_data, _end_training, in
| put text = "End of training window"
| @events -on title;
// mark non-contiguous outlier events
batch -every every | percentile -p 1.0 in | unbatch // select max over each interval
| put _outlier_event = (*in > 0)
| put _deduped_event = _outlier_event && !stdj.previous(_outlier_event, false)
| filter _deduped_event == true
| remove _outlier_event, _deduped_event
| put text = "Outlier ("+math.roundStr(*in, 1)+"-sigma)", type = "fa-exclamation-triangle", label=in
| @events -on title;
merge;
)
}
export sub to_events_by(in, every, after, by, title) {
(
// mark the end of training window
put _training_data = (time < after)
| put _end_training = (!_training_data && stdj.previous(_training_data, true))
| filter _end_training == true
| remove _training_data, _end_training, in
| put text = "End of training window"
| @events -on title;
// mark non-contiguous outlier events
batch -every every | percentile -p 1.0 in by by | unbatch // select max over each interval
| put _outlier_event = (*in > 0)
| put _deduped_event = _outlier_event && !stdj.previous(_outlier_event, false) by by
| filter _deduped_event == true
| remove _outlier_event, _deduped_event
| put text = "Outlier ("+math.roundStr(*in, 1)+"-sigma)", type = "fa-exclamation-triangle", label=*by
| @events -on title;
merge;
)
}
export sub outlier_chart_daily(in, every, after, sigma, title) {
const sig_title = title + " " + Number.toString(sigma)+'-sigma outliers';
// normalize each input series, and flag outliers n-sigmas from the mean.
outlier_daily -in in -out 'sigma' -after after -sigma sigma
| filter time > after
| ts.split_dual_chart -title sig_title -secondary 'sigma'
// generate an event stream from the outliers just computed, and overlay it on the timechart
| to_events -in 'sigma' -every every -after after -title sig_title
| stdj.end
}
export sub outlier_chart_daily_by(in, every, after, sigma, by, title) {
const sig_title = title + " " + Number.toString(sigma)+'-sigma outliers';
// normalize each input series, and flag outliers n-sigmas from the mean.
outlier_daily_by -in in -out in -after after -sigma sigma -by by
| filter time > after
| ts.chart_by -by by -title sig_title // split makes this difficult to combine with the originals
// generate an event stream from the outliers just computed, and overlay it on the timechart
| to_events_by -in in -every every -after after -by by -title title
| stdj.end
}
export sub outlier_chart_forecast_daily(in, every, after, sigma, title) {
const sig_title = title + " " + Number.toString(sigma)+'-sigma outliers';
// normalize each input series, and flag outliers n-sigmas from the mean.
outlier_forecast_daily -in in -out 'sigma' -after after -sigma sigma
| filter time > after
| ts.split_dual_chart -title sig_title -secondary 'sigma'
// generate an event stream from the outliers just computed, and overlay it on the timechart
| to_events -in 'sigma' -every every -after after -title sig_title
| stdj.end
}
export sub outlier_chart_forecast_daily_by(in, every, after, sigma, title,by) {
const sig_title = title + " " + Number.toString(sigma)+'-sigma outliers';
// normalize each input series, and flag outliers n-sigmas from the mean.
outlier_forecast_daily_by -in in -out 'sigma' -after after -sigma sigma -by by
| filter time > after
| ts.split_dual_chart_by -title sig_title -secondary 'sigma' -by by
// generate an event stream from the outliers just computed, and overlay it on the timechart
| to_events_by -in 'sigma' -every every -after after -title sig_title -by by
| stdj.end
}
export sub outlier_chart_weekly(in, every, after, sigma, title) {
const sig_title = title + " " + Number.toString(sigma)+'-sigma outliers';
// normalize each input series, and flag outliers n-sigmas from the mean.
outlier_weekly -in in -out 'sigma' -after after -sigma sigma
| filter time > after
| ts.split_dual_chart -title sig_title -secondary 'sigma'
// generate an event stream from the outliers just computed, and overlay it on the timechart
| to_events -in 'sigma' -every every -after after -title sig_title
| stdj.end
}
export sub charts_weekly_by(in, every, after, sigma, by, title) {
const sig_title = title + " " + Number.toString(sigma)+'-sigma outliers';
// normalize each input series, and flag outliers n-sigmas from the mean.
outlier_weekly_by -in in -out in -after after -sigma sigma -by by
| filter time > after
| ts.chart_by -by by -title sig_title // split makes this difficult to combine with the originals
// generate an event stream from the outliers just computed, and overlay it on the timechart
| to_events_by -in in -every every -after after -by by -title title
| stdj.end
}
import "math.juttle" as math;
import "sources.juttle" as sources;
import "interp.juttle" as interp;
import "trend.juttle" as trend;
import "seasonal.juttle" as seasonal;
source 'https://gist.githubusercontent.com/welch/e4961c137b8e4903ce2e/raw/0c6bc636f560c5a15d8e3cf7af8f748fdce2ea38/kjh.json'
| keep humidity, pressure, temp, temp2, time
| split
| (
filter name ~ /temp/ | @timechart;
filter name !~ /temp/ | @timechart;
) | @logger
// exponential forecast model.
//
// This uses exponential weighted averages to infer a time-varying
// mean and variance of a stochastic process underlying a series of
// noisy observations. It automatically selects the smoothing
// parameter to minimize the RSS of one-step-ahead prediction errors
// over a specified window.
//
import 'https://gist.githubusercontent.com/welch/3f7b696beab6ba1b55bb/raw/stdj.juttle' as stdj;
import 'https://gist.githubusercontent.com/welch/3f7b696beab6ba1b55bb/raw/math.juttle' as math;
import 'https://gist.githubusercontent.com/welch/3f7b696beab6ba1b55bb/raw/sources.juttle' as sources;
import 'https://gist.githubusercontent.com/welch/3f7b696beab6ba1b55bb/raw/ts.juttle' as ts;
import 'https://gist.githubusercontent.com/welch/3f7b696beab6ba1b55bb/raw/seasonal.juttle' as seasonal;
import 'https://gist.githubusercontent.com/welch/3f7b696beab6ba1b55bb/raw/mav.juttle' as mav;
import 'https://gist.githubusercontent.com/welch/3f7b696beab6ba1b55bb/raw/mad.juttle' as mad;
//
// fit_coeffs: fit an exponential smoothing model to a batch or window of points.
// This is your trusty moving average update [alpha * new + (1 - alpha) * old]
// dressed up so we can track the prediction error and dial in alpha that minimizes
// RSS over the interval.
//
// NOTE: This must be called with -acc true so that model state persists across epochs.
//
export reducer fit_coeffs(in, initial_field) {
const default_alpha = 0.5; // default alpha
var t0, level, err, last_time;
var alpha;
var initialized = false;
var PROD_2991;
function initialize(time, in0, initial) {
if (initial == null) {
// default initialization from first point
level = in0 ;
t0 = time;
alpha = default_alpha;
} else {
// initialize using upstream state in initial
level = initial[0];
t0 = initial[1];
alpha = initial[2];
}
err = null;
last_time = null;
initialized = true;
}
function update() {
var time = *'time';
if (!initialized) {
var initial = (initial_field != null)? *initial_field : null;
PROD_2991 = initialize(time, *in, initial);
}
if (last_time != null) {
// classic EWMA, rewritten as forecast error
err = *in - level;
level = level + alpha * err;
}
last_time = time;
}
function result() {
return [level, t0, alpha, err];
}
function expire() {
// never expire data or reset this reducer
}
}
//
// value:
// forecast the mean at the given time.
// This is pretty dull for single-exponential forecasting!
//
export function value(t, coeffs, default) {
return math.finite(coeffs[0], default);
}
export sub forecast(in, out) {
put *out = value(time, fit_coeffs(in, null), *in)
}
export sub forecast_by(in, out, by) {
put *out = value(time, fit_coeffs(in, null), *in) by by
}
export sub forecast_err(in, out) {
put *out = fit_coeffs(in, null)[3]
}
export sub forecast_err_by(in, out, by ) {
put *out = fit_coeffs(in, null)[3] by by
}
export sub run() {
const from = :2014-01-01: ;
const dur = :7w: ;
const to = from + :7w:;
const halfto = from + :2w:;
sources.bumpy_cpu -from from -to from + dur -every :h:
| (
forecast -in 'cpu' -out 'forecast'
| ts.split_chart -title "simulated cpu";
seasonal.squash_weekly -in 'cpu' -out 'squashed'
| forecast -in 'squashed' -out 'forecast'
| ts.split_chart -title "deseasonal forecast";
seasonal.squash_weekly -in 'cpu' -out 'squashed'
| forecast_err -in 'squashed' -out 'err'
| remove squashed
| ts.split_chart -title "deseasonal forecast error";
seasonal.squash_weekly -in 'cpu' -out 'squashed'
| forecast_err -in 'squashed' -out 'forecast_err'
| mad.normalize -in 'forecast_err' -out 'sigma' -over :d:
| remove squashed
| ts.split_chart -title "deseasonal normalized forecast error"
) | stdj.end
}
run
// periodic interpolation functions.
//
//
// project_time: project time onto a cycle of intervals of duration period/n.
// Return [idx, u] where 0 <= idx < n is the periodic interval this
// time falls into, and 0 <= u < 1 is the coordinate within the
// unit interval
//
export function project_time(time, n, period) {
var interval = period / n;
if (time == null) { return [0, 0]; }
var t0 = Date.quantize(time, interval);
var u = (time - t0) / interval;
var idx = Math.floor((t0 - Date.new(0)) / interval) % n;
return [idx, u];
}
//
// pwl: piecewise-linear interpolation (periodic)
// interpolate values to get a continuous periodic function of time composed
// of lines connecting the values in A.
// A is an array of samples representing an overall cycle of duration period
// with samples uniformly spaced and s.t. A[0] should be the value at t==0.
//
export function pwl(time, A, len, period) {
var IDX = project_time(time, len, period);
var idx=IDX[0], u=IDX[1];
return (1 - u) * A[idx] + u * A[ (idx+1) % len ];
}
//
// catrom_weights: weighting for successive points for a given 0 <= u < 1
//
function catrom_weights(u) {
var u2 = u*u;
var u3 = u2 * u;
var W = [-u/2 + u2 - u3/2,
1 - 2.5 * u2 + 1.5 * u3,
u / 2 + 2 * u2 - 1.5 * u3,
-u2 / 2 + u3 / 2];
return W;
}
//
// project_cubic: cubic spline interpolation along with segment index and weights
// Return an array [val, idx, W], where
// val == W[0]*A[idx] + W[1]*A[idx+1] + W[2]*A[idx+2] + W[3]*A[idx+3], indexed modulo len
//
export function project_cubic(time, A, len, period) {
var IDX = project_time(time, len, period);
var idx=(IDX[0] + len - 1) % len, u=IDX[1];
var W = catrom_weights(u);
var val = W[0]*A[idx] + W[1]*A[(idx+1) % len] + W[2]*A[(idx+2) % len] + W[3]*A[(idx+3) % len];
return [val, idx, W, u];
}
//
// cubic: cubic spline interpolation (periodic)
// interpolate values to get a smooth periodic function of time.
// A is an array of samples representing an overall cycle of duration period
// with samples uniformly spaced and s.t. A[0] should be the value at t==0.
//
export function cubic(time, A, len, period) {
return project_cubic(time, A, len, period)[0];
}
//
// demo: draw two periods of a cycle, interpolating linearly and cubically
//
export sub run() {
const values = [10, 5, 20, 15, 30];
const N = 5;
emit -from Date.new(0) -limit 60
| put PWL = pwl(time, values, N, :30s:)
| put curve = cubic(time, values, N, :30s:)
| split
| @timechart
}
run
// Limelight:
//
// Apply seasonal outlier detection to the limelight rollup and constituents
//
import 'https://gist.githubusercontent.com/welch/3f7b696beab6ba1b55bb/raw/ts.juttle' as ts;
import 'https://gist.githubusercontent.com/welch/3f7b696beab6ba1b55bb/raw/seasonal.juttle' as seasonal;
import 'https://gist.githubusercontent.com/welch/3f7b696beab6ba1b55bb/raw/anomaly.juttle' as anomaly;
//
// load a month of data and initialize weekly and daily de-seasonalizers from the first 3-weeks,
// then compute variance and outliers for the residual.
//
// This is hard-wired to source a particular traffic rollup. It shows a strong
// weekend effect, but ultimately no 3-sigma anomalies are flagged. It is a
// good demonstration of the weekly and daily de-seasonalizer.
//
// many hosts that contributed to this rollup are interesting by themselves when
// processed in the same way as the rollup:
//
// cds38.pmo.llnw.net // regular daily with a big spike && a few smaller ones
// cds1042.lga.llnw.net // tapers at end of month -- the first low day is flagged
// cds128.lin.llnw.net // regular daily with choppy waters at end of month
// cds186.yyz.llnw.net // extremely non-normal, counterexample for this approach
//
export sub run() {
const preroll = :30 days ago: + :21d:; // end of training window (at least a week long)
const in = 'value'; // name of metric field to use
const outlier = 3; // alert on 3 sigma values
read -last :30d: 'zbx.eq_reports_data_fetcher_direct_traffic_for_account_bps.sh_845_300_2008_900_'
| keep in, name, time
|(
//
// show the raw data
//
ts.split_chart -title "raw data"
//
// show a weekly curve fit to part of the data after preroll
//
| seasonal.weekly -in in -out 'weekly'
| filter time > preroll | ts.split_chart_end -title "weekly fit";
//
// show a daily curve fit to the weekly residual
//
seasonal.deweekly -in in -out 'deweekly'
| seasonal.daily -in 'deweekly' -out 'daily'
| filter time > preroll | ts.split_chart_end -title "daily fit after weekly";
//
// normalize the daily residual (assumes roughly gaussian or at least symmetric)
//
anomaly.normalize_weekly -in in -out in
| filter time > preroll | ts.split_chart_end -title "residual";
//
// apply a robust estimate of variance to the residual to mark outliers
//
anomaly.outlier_chart_weekly -in in -every :h: -after preroll -sigma outlier -title "direct traffic"
)
}
run
// median absolute deviation: robust streaming estimates of mean and variation
//
// MAD de-means the data, it is deviation from the mean, as opposed
// to absolute deviation (MAV).
//
import 'https://gist.githubusercontent.com/welch/3f7b696beab6ba1b55bb/raw/math.juttle' as math;
import 'https://gist.githubusercontent.com/welch/3f7b696beab6ba1b55bb/raw/sources.juttle' as sources;
import 'https://gist.githubusercontent.com/welch/3f7b696beab6ba1b55bb/raw/mav.juttle' as mav;
//
// medians
//
export sub median_every(in, from, to, every, out) {
(
reduce -every every -over every -from from -to to _median_tmp = percentile(in, 0.5)
| pace -every :0s: -from from;
merge;
) | join -onchange 2
| put *out = _median_tmp | remove _median_tmp
}
export sub median(in, over, out) {
put -over over _median_tmp = percentile(in, 0.5)
| put *out = _median_tmp
| remove _median_tmp
}
export sub median_by(in, over, out, by) {
put -over over _median_tmp = percentile(in, 0.5) by by
| put *out = _median_tmp
| remove _median_tmp
}
export sub demedian_every(in, from, to, every, out) {
median_every -in in -from from -to to -every every -out '_demedian_tmp'
| put *out = *in - _demedian_tmp
| remove _demedian_tmp
}
export sub demedian(in, over, out) {
put -over over _demedian_tmp = percentile(in, 0.5)
| put *out = math.isFinite(_demedian_tmp) ? *in - _demedian_tmp : 0
| remove _demedian_tmp
}
export sub demedian_by(in, over, out, by) {
put -over over _demedian_tmp = percentile(in, 0.5) by by
| put *out = math.isFinite(_demedian_tmp) ? *in - _demedian_tmp : 0
| remove _demedian_tmp
}
//
// standard deviation approximated as scaled MAD if data is N(mu, sigma)
//
export sub sd_every(in, from, to, every, out) {
demedian_every -in in -from from -to to -every every -out '_mad_tmp'
| mav.sd_every -in '_mad_tmp' -from from -to to -every every -out out
| remove _mad_tmp
}
export sub sd(in, over, out) {
demedian -in in -over over -out '_mad_tmp'
| mav.sd -in '_mad_tmp' -over over -out out
| remove _mad_tmp
}
export sub sd_by(in, over, out, by) {
demedian_by -in in -over over -out '_mad_tmp' -by by
| mav.sd_by -in '_mad_tmp' -over over -out out -by by
| remove _mad_tmp
}
//
// normalization based on MAD
//
export sub normalize_every(in, from, to, every, out) {
demedian_every -in in -from from -to to -every every -out '_normalize_demedian_tmp'
| sd_every -in in -from from -to to -every every -out '_normalize_mad_tmp'
| put *out = math.finite(_normalize_demedian_tmp / _normalize_mad_tmp, 0)
| remove _normalize_demedian_tmp, _normalize_mad_tmp
}
export sub normalize(in, over, out) {
demedian -in in -over over -out '_normalize_demedian_tmp'
| sd -in in -over over -out '_normalize_mad_tmp'
| put *out = math.finite(_normalize_demedian_tmp / _normalize_mad_tmp, 0)
| remove _normalize_demedian_tmp, _normalize_mad_tmp
}
export sub normalize_by(in, over, out, by) {
demedian_by -in in -over over -out '_normalize_demedian_tmp' -by by
| sd_by -in in -over over -out '_normalize_mad_tmp' -by by
| put *out = math.finite(_normalize_demedian_tmp / _normalize_mad_tmp, 0)
| remove _normalize_demedian_tmp, _normalize_mad_tmp
}
//
// coefficient of quartile variation (aka quartile coefficient of dispersion,
// https://en.wikipedia.org/wiki/Quartile_coefficient_of_dispersion)
//
// This is a robust analogue to the coefficient of variation
// (stddev/mean). Dividing by CVQ will standardize data to mean ===
// 1 while preserving ratios about the mean, for use with nonnormal
// "ratio" data -- positive values, generally away from zero. The
// scaling by the mean gives a dimensionless measure of relative
// variation.
//
// We call it cv because you use it like coefficient of variation
// (but you know it's CQV because you got it from this module...)
//
// export sub cv_every(in, from, to, every, out) {
// (
// reduce -every every -over every -from from -to to _cv_q1 = percentile(in, 0.25), _cv_q3 = percentile(in, 0.75)
// | put *out = (_cv_q1 != null && _cv_q3 != null) ? ((_cv_q3 - _cv_q1) / (_cv_q3 + _cv_q1)) : 0
// | pace -every :0s: -from from;
// merge;
// ) | join -onchange 2
// | remove _mad_q1, _mad_q3
// }
// export sub cv(in, over, out) {
// put -over over _cv_q1 = percentile(in, 0.25), _cv_q3 = percentile(in, 0.75)
// | put *out = (_cv_q1 != null && _cv_q3 != null) ? ((_cv_q3 - _cv_q1) / (_cv_q3 + _cv_q1)) : 0
// | remove _cv_q1, _cv_q3
// }
// export sub cv_by(in, over, out, by) {
// put -over over _cv_q1 = percentile(in, 0.25), _cv_q3 = percentile(in, 0.75) by by
// | put *out = (_cv_q1 != null && _cv_q3 != null) ? ((_cv_q3 - _cv_q1) / (_cv_q3 + _cv_q1)) : 0
// | remove _cv_q1, _cv_q3
// }
//
// demo: de-medianing and normalizing.
//
export sub run() {
const start = :2014-01-01: ;
const dur = :1w: ;
const durS = Duration.toString(dur);
sources.bumpy_cpu -from start -to start + dur -every :20m:
| (
median_every -in 'cpu' -from start -to start + dur -every dur -out 'median-every-'+durS
| median_every -in 'cpu' -from start -to null -every :6h: -out 'median-every-6h'
| median -over dur -in 'cpu' -out 'streaming-median-'+durS
| split
| @timechart -title "medians" -display.dataDensity 0;
demedian_every -in 'cpu' -from start -to start + dur -every dur -out 'demedian-every-'+durS
| demedian_every -in 'cpu' -from start -to null -every :6h: -out 'demedian-every-6h'
| demedian -over dur -in 'cpu' -out 'streaming-demedian-'+durS
| split
| @timechart -title "demedians" -display.dataDensity 0;
sd_every -in 'cpu' -from start -to start + dur -every dur -out 'mad-every-'+durS
| sd_every -in 'cpu' -from start -to null -every :6h: -out 'mad-every-6h'
| sd -over dur -in 'cpu' -out 'streaming-mad-'+durS
| split
| @timechart -title "mads" -display.dataDensity 0;
normalize_every -in 'cpu' -from start -to start + dur -every dur -out 'normalize-'+durS
| normalize_every -in 'cpu' -from start -to null -every :6h: -out 'normalize-6h'
| normalize -over dur -in 'cpu' -out 'streaming-normalize-'+durS
| split
| @timechart -title "normalization" -display.dataDensity 0;
)
}
run
// Magneto: 18 days of magnetometer readings from https://data.sparkfun.com/streams/dZqRNRLywmiO5AnQj03D
//
// These arrive as mag_x, mag_y and mag_z readings on one point every
// 30 seconds. The readings eventually develop sharp daily peaks
// around 16:00-18:00, but ony after the big spike halfway through the
// data set, which gives our daily deseasonalizer a workout.
//
// This example also shows the use of the split processor at ingress,
// and *_by functions throughout the body to operate on mag_x/y/z as
// separate streams without having to filter them individually (or
// even know the field names).
//
import 'https://gist.githubusercontent.com/welch/3f7b696beab6ba1b55bb/raw/ts.juttle' as ts;
import 'https://gist.githubusercontent.com/welch/3f7b696beab6ba1b55bb/raw/mad.juttle' as mad;
import 'https://gist.githubusercontent.com/welch/3f7b696beab6ba1b55bb/raw/anomaly.juttle' as anomaly;
export sub run() {
const from = :2014-11-30T12:35:53.880Z:;
const to = :2014-12-18T09:33:48.104Z:;
const preroll = from + :3d:; // training window
const outlier = 3; // alert on 3 sigma values
const title = 'centered readings';
source 'https://gist.githubusercontent.com/welch/520529dfd6bce2e3dadf/raw/fda2322bce7b3672abc30727396685ba7948bed4/magneto50k.json'
| split
// values arrive every 30 seconds, downsample to half-hour for speed. The earth just doesn't change all that quickly.
| ts.downsample_by -in 'value' -every :30m: -by 'name'
| ts.chart_by -title 'magnetometer data' -by 'name'
// center the data so we can better see the variation in the charts (not actually needed for outlier detection)
| mad.demedian_by -over :2d: -in 'value' -out 'value' -by 'name'
// generate outlier charts for each of the series
| ts.chart_by -title title -by 'name'
| anomaly.outlier_chart_daily_by -in 'value' -every :30m: -after preroll -sigma outlier -by 'name' -title title
}
run
//
// built-in demos of de-trending, de-seasonalization, and robust outlier detection modules
// Each module is importable for use, but also contains a standalone demo named run()
//
import 'https://gist.githubusercontent.com/welch/3f7b696beab6ba1b55bb/raw/trend.juttle' as trend;
import 'https://gist.githubusercontent.com/welch/3f7b696beab6ba1b55bb/raw/mad.juttle' as mad;
import 'https://gist.githubusercontent.com/welch/3f7b696beab6ba1b55bb/raw/seasonal.juttle' as seasonal;
import 'https://gist.githubusercontent.com/welch/3f7b696beab6ba1b55bb/raw/magneto.juttle' as magneto;
import 'https://gist.githubusercontent.com/welch/3f7b696beab6ba1b55bb/raw/twitter.juttle' as twitter;
import 'https://gist.githubusercontent.com/welch/3f7b696beab6ba1b55bb/raw/pricklepants.juttle' as pricklepants;
export run() {
twitter.run; // twitter anomaly series, robust outlier detection
//magneto.run; // magnetometer readings, robust outlier detection
//seasonal.run; // daily and weekly variation removal
//trend.run; // rate-of-change estimation using linear regression
//mad.run; // robust standard deviations with MAD
//pricklepants.run; // a disaster -- count data is not normally distributed
}
run
// math -- juttle math utilities
//
//
// inv2x2:
// invert a 2x2 matrix | a, b ; c, d | (row major order)
//
export function inv2x2(a, b, c, d) {
var det = (a * d - b * c) ;
return [d / det, -b / det, -c / det, a / det] ;
}
//
// isFinite:
// verify x is a good, red-blooded number:
// not null, not NaN, not infinity
//
export function isFinite(x) {
return (x == x && x != null && x != x + 1);
}
//
// finite:
// replace unpleasant values with a specified default
//
export function finite(x, default) {
return isFinite(x) ? x : default;
}
export function round(number, decimals) {
return Math.round(number * Math.pow(10,decimals)) / Math.pow(10,decimals);
}
export function roundStr(number, decimals) {
return Number.toString(round(number, decimals));
}
// median absolute value: robust streaming estimates variation for non-normal data
// MAV does not de-mean the data, it is absolute variation rather than
// deviation from the mean (MAD).
//
import 'https://gist.githubusercontent.com/welch/3f7b696beab6ba1b55bb/raw/stdj.juttle' as stdj;
import 'https://gist.githubusercontent.com/welch/3f7b696beab6ba1b55bb/raw/math.juttle' as math;
import 'https://gist.githubusercontent.com/welch/3f7b696beab6ba1b55bb/raw/sources.juttle' as sources;
import 'https://gist.githubusercontent.com/welch/3f7b696beab6ba1b55bb/raw/ts.juttle' as ts;
//
// median absolute value. saves you the cost of de-meaning for
// centered data. We call it SD because you use it like standard deviation
// (but you know it's MAV because you got it from this module...)
//
export sub sd_every(in, from, to, every, out) {
(
put _adev = Math.abs(*in)
| reduce -every every -over every -from from -to to _mav_tmp = percentile(_adev, 0.5)
| pace -every :0s: -from from;
merge;
) | join -onchange 2
| put *out = _mav_tmp * 1.48 | remove _mav_tmp, _adev
}
export sub sd(in, over, out) {
put _adev = Math.abs(*in)
| put -over over _mav_tmp = percentile(_adev, 0.5)
| put *out = _mav_tmp * 1.48
| remove _mav_tmp, _adev
}
export sub sd_by(in, over, out, by) {
put _adev = Math.abs(*in)
| put -over over _mav_tmp = percentile(_adev, 0.5) by by
| put *out = _mav_tmp * 1.48
| remove _mav_tmp, _adev
}
//
// normalization based on MAV magnitude
//
export sub normalize_every(in, from, to, every, out) {
sd_every -in in -from from -to to -every every -out '_normalize_mav_tmp'
| put *out = math.finite(*in / _normalize_mav_tmp, 0)
| remove _normalize_mav_tmp
}
export sub normalize(in, over, out) {
sd -in in -over over -out '_normalize_mav_tmp'
| put *out = math.finite(*in / _normalize_mav_tmp, 0)
| remove _normalize_mav_tmp
}
export sub normalize_by(in, over, out, by) {
sd_by -in in -over over -out '_normalize_mav_tmp' -by by
| put *out = math.finite(*in / _normalize_mav_tmp, 0)
| remove _normalize_mav_tmp
}
//
// demo
//
export sub run() {
const start = :2014-01-01: ;
const dur = :1w: ;
const durS = Duration.toString(dur);
sources.bumpy_cpu -from start -to start + dur -every :20m:
| (
sd_every -in 'cpu' -from start -to start + dur -every dur -out 'mav-every-'+durS
| sd_every -in 'cpu' -from start -to null -every :6h: -out 'mav-every-6h'
| sd -over dur -in 'cpu' -out 'streaming-mav-'+durS
| split
| @timechart -title "mavs" -display.dataDensity 0;
normalize_every -in 'cpu' -from start -to start + dur -every dur -out 'normalize-'+durS
| normalize_every -in 'cpu' -from start -to null -every :6h: -out 'normalize-6h'
| normalize -over dur -in 'cpu' -out 'streaming-normalize-'+durS
| split
| @timechart -title "normalization" -display.dataDensity 0;
)
}
run
// Pricklepants:
// analysis of distances run by Sir Charles Pricklepants (https://data.sparkfun.com/streams/MGGgb6qr39tlRVo5V8o0)
//
// Readings over a week roughly every 5 minutes, with large gaps
// (implying 0 data?) Our MAD-SD scheme does a terrible job here
// because this is count data, not normally distributed, and not even
// something that tends to symmetry after centering.
//
import "stdj.juttle" as stdj;
import "ts.juttle" as ts;
import "anomaly.juttle" as anomaly;
import "seasonal.juttle" as seasonal;
export sub run() {
const from = :2014-08-18T05:00:12.237Z:;
const to = :2014-08-24T13:57:33.784Z:;
const in = 'distanceInCm';
const preroll = from + :3d:; // training window
const outlier = 3; // alert on 3 sigma values
const title = "Sir Charles sprints:";
source 'https://gist.githubusercontent.com/welch/d283a8cb3c940b7b5030/raw/63802316c2df6650e0207c4c33a14db5b822a01c/pricklepants.json'
// resample this data to fill in the gaps with 0's. Don't downsample, they are counts!
| reduce -every :10m: resampled = sum(in)
| ts.split_chart -title title
| anomaly.outlier_chart_daily -in 'resampled' -every :h: -after preroll -sigma outlier -title title
}
run
// model repeating features ("seasons") in a series, at varying intervals and resolutions
//
import 'https://gist.githubusercontent.com/welch/3f7b696beab6ba1b55bb/raw/stdj.juttle' as stdj;
import 'https://gist.githubusercontent.com/welch/3f7b696beab6ba1b55bb/raw/math.juttle' as math;
import 'https://gist.githubusercontent.com/welch/3f7b696beab6ba1b55bb/raw/sources.juttle' as sources;
import 'https://gist.githubusercontent.com/welch/3f7b696beab6ba1b55bb/raw/ts.juttle' as ts;
import 'https://gist.githubusercontent.com/welch/3f7b696beab6ba1b55bb/raw/interp.juttle' as interp;
import 'https://gist.githubusercontent.com/welch/3f7b696beab6ba1b55bb/raw/trend.juttle' as trend;
import 'https://gist.githubusercontent.com/welch/3f7b696beab6ba1b55bb/raw/mad.juttle' as mad;
//
// fit_coeffs: Fit a periodic series of control points that define a best-fit piecewise cubic curve
// for a batch or window of points.
//
// NOTE: This must be called with -acc true so that model state persists across epochs.
//
// parameters:
// in -- field to fit
// period -- duration of one cycle.
// N -- number of intervals to subdivide period for piecewise fit.
// initial_field -- if nonnull, field containing an array of forecast control values from upstream
//
// returns vector [A, len, period, params]
//
export reducer fit_coeffs(in, period, N, initial_field) {
const interval = period / N;
const initial_gain = 1.0, min_gain = .1, gain_decay = 0.9;
var t0, n_pts, epoch, n_epoch;
var pts = [], dpts = [];
var gain;
var initialized = false;
var PROD_2991 ;
function fill(dst, value, n) {
if (n > 0) {
n = n - 1;
dst[n] = value;
return fill(dst, value, n);
} else {
return dst;
}
}
function copy(dst, src, n) {
if (n > 0) {
n = n - 1;
dst[n] = src[n];
return copy(dst, src, n);
} else {
return dst;
}
}
function macc(dst, scale, src, n) {
if (n > 0) {
n = n - 1;
dst[n] = dst[n] + src[n] * scale;
return macc(dst, scale, src, n);
} else {
return dst;
}
}
function initialize(time, initial) {
// initial values here aren't very important because an initial gain of 1.0
// means we'll install a best-fit to the first period. subsequent lower gains
// will allow some drift over time.
PROD_2991 = fill(pts, 0, N);
PROD_2991 = fill(dpts, 0, N);
if (initial != null) {
PROD_2991 = copy(pts, initial[0], N);
gain = initial[3][0] / gain_decay ;
} else {
gain = initial_gain / gain_decay;
}
t0 = time + :0s:; // XXX remove any epsilon so align does the right thing
epoch = Date.align(time, period, t0);
n_epoch = 0;
n_pts = 0;
initialized = true;
}
function advance_state(step) {
// gradient descent on the accumulated error.
PROD_2991 = macc(pts, step, dpts, N);
PROD_2991 = fill(dpts, 0, N);
}
function update() {
var err, Wy, i_y, W, this_epoch;
var time = *'time' + :0s:; // XXX remove epsilon so Date.align does the right thing.
if (*in == null) {
PROD_2991 = stdj.warnf("fit_coeffs sees no value for "+in);
}
if (!initialized) {
var initial_value = (initial_field != null)? *initial_field : null;
PROD_2991 = initialize(time, initial_value);
}
this_epoch = Date.align(time, period, t0);
if (epoch != this_epoch) {
epoch = this_epoch;
n_epoch = n_epoch + 1;
gain = Math.max(gain * gain_decay, min_gain);
if (n_pts > 0) {
PROD_2991 = advance_state(gain * N / n_pts) ;
n_pts = 0;
}
}
n_pts = n_pts + 1;
// evaluate the seasonal curve at this time
// XXX avoid the first point of the epoch until we've sorted out epsilon-wrap.
Wy = interp.project_cubic(time, pts, N, period);
err = *in - Wy[0]; // actual - expected seasonal value at this time
i_y = Wy[1]; // index into control point array
W = Wy[2]; // weights of control points at this time
// distribute the error over the control point subset for this time.
// error from succssive updates will accumulate in dpts until the period epoch,
// and then advance_state() will absorb them.
dpts[ i_y ] = dpts[ i_y ] + err * W[0];
dpts[(i_y+1) % N] = dpts[(i_y+1) % N] + err * W[1];
dpts[(i_y+2) % N] = dpts[(i_y+2) % N] + err * W[2];
dpts[(i_y+3) % N] = dpts[(i_y+3) % N] + err * W[3];
}
function result() {
return [pts, N, period, [gain], n_pts, n_epoch];
}
function expire() {
// never expire data or reset this reducer
}
}
//
// evaluate the piecewise cubic curve represented by coeffs at the given time.
// coeffs are as returned by fit_coeffs.
//
export function value(t, coeffs, default) {
return math.finite(interp.cubic(t, coeffs[0], coeffs[1], coeffs[2]), default);
}
//
// compute seasonal coefficients
// if non-null, -from and -to let you retroactively put the final fitted
// coefficients onto the points from that range.
//
export sub fit(in, from, to, period, out) {
const N = 30; // number of spline segments per period
(
// XXX -over is meaningless, needed so we can specify -from and -to.
// reducer state is maintained and continually adjusted as points arrive
reduce -acc true -over :1s: -from from -to to
__coeffs = fit_coeffs(in, period, N, '_coeffs')
| pace -every :0s: -from from ;
merge;
) | join -onchange 2
| filter __coeffs != null // PROD-2850
| put *out = __coeffs
| remove __coeffs
}
//
// seasonal:
// estimate seasonal component of a metric series.
// parameters:
// in -- field to seasonalize
// from, to -- if non-null, restricts fitting to a historic range
// period -- duration of one cycle.
// out -- output field for seasonal estimate on each point
//
export sub seasonal_over(in, from, to, period, out) {
fit -from from -to to -period period -in in -out '_seasonal_coeffs_tmp'
| put *out = value(time, _seasonal_coeffs_tmp, *in)
| remove _seasonal_coeffs_tmp
}
export sub seasonal(in, period, out) {
const N = 30; // number of spline segments per period
put *out = value(time, fit_coeffs(in, period, N, '_coeffs'), *in)
}
export sub seasonal_by(in, period, out, by) {
const N = 30; // number of spline segments per period
put *out = value(time, fit_coeffs(in, period, N, '_coeffs'), *in) by by
}
export sub daily(in, out) {
const period = :d:;
const N = 24; // every hour
put *out = value(time, fit_coeffs(in, period, N, '_coeffs'), *in)
}
export sub daily_by(in, out, by) {
const period = :d:;
const N = 24; // every hour
put *out = value(time, fit_coeffs(in, period, N, '_coeffs'), *in) by by
}
export sub weekly(in, out) {
const period = :w:;
const N = 28; // every 6 hours
put *out = value(time, fit_coeffs(in, period, N, '_coeffs'), *in)
}
export sub weekly_by(in, out, by) {
const period = :w:;
const N = 28; // every 6 hours
put *out = value(time, fit_coeffs(in, period, N, '_coeffs'), *in) by by
}
//
// deseasonal:
// de-seasonalize intervals of points by subtracting the MSE linear fit
// as each point arrives.
// parameters:
// in -- field to de-seasonalize
// every,over -- data window for fitting
// out -- output field for deseasonalized value on each point
//
export sub deseasonal_over(in, from, to, period, out) {
seasonal_over -from from -to to -period period -in in -out out
| put *out = *in - *out
}
export sub deseasonal(in, period, out) {
const N = 30; // number of spline segments per period
put *out = *in - value(time, fit_coeffs(in, period, N, '_coeffs'), *in)
}
export sub deseasonal_by(in, period, out, by ) {
const N = 30; // number of spline segments per period
put *out = *in - value(time, fit_coeffs(in, period, N, '_coeffs'), *in) by by
}
export sub dedaily(in, out) {
const period = :d:;
const N = 24; // every hour
put *out = *in - value(time, fit_coeffs(in, period, N, '_coeffs'), *in)
}
export sub dedaily_by(in, out, by) {
const period = :d:;
const N = 24; // every hour
put *out = *in - value(time, fit_coeffs(in, period, N, '_coeffs'), *in) by by
}
export sub deweekly(in, out) {
const period = :w:;
const N = 28;
put *out = *in - value(time, fit_coeffs(in, period, N, '_coeffs'), *in)
}
export sub deweekly_by(in, out, by) {
const period = :w:;
const N = 28;
put *out = *in - value(time, fit_coeffs(in, period, N, '_coeffs'), *in) by by
}
//
// squash repeated daily variations
//
export sub squash_daily(in, out) {
mad.demedian -over :2d: -in in -out '_squash_tmp'
| dedaily -in '_squash_tmp' -out out
| remove _squash_tmp
}
export sub squash_daily_about(in, out, mean) {
put _squash_tmp = *in - *mean
| dedaily -in '_squash_tmp' -out out
| remove _squash_tmp
}
export sub squash_daily_by(in, out, by ) {
mad.demedian_by -over :2d: -in in -out '_squash_tmp' -by by
| dedaily_by -in '_squash_tmp' -out out -by by
| remove _squash_tmp
}
export sub squash_daily_about_by(in, out, by, mean ) {
put _squash_tmp = *in - *mean
| dedaily_by -in '_squash_tmp' -out out -by by
| remove _squash_tmp
}
//
// squash trend and repeated weekly and daily variations
//
export sub squash_weekly(in, out) {
mad.demedian -over :w: -in in -out '_squash_t'
// trend.detrend -in in -over :w: -out '_squash_t'
| deweekly -in '_squash_t' -out '_squash_w'
| dedaily -in '_squash_w' -out out
// | dedaily -in '_squash_t' -out '_squash_w'
// | deweekly -in '_squash_w' -out out
| remove _squash_t, _squash_w
}
export sub squash_weekly_about(in, out, mean) {
put _squash_t = *in - *mean
| deweekly -in '_squash_t' -out '_squash_w'
| dedaily -in '_squash_w' -out out
| remove _squash_t, _squash_w
}
export sub squash_weekly_by(in, out, by) {
mad.demedian_by -over :w: -in in -out '_squash_t' -by by
// trend.detrend_by -in in -over :w: -out '_squash_t' -by by
| deweekly_by -in '_squash_t' -out '_squash_w' -by by
| dedaily_by -in '_squash_w' -out out -by by
| remove _squash_t, _squash_w
}
//
// demo: daily periodics.
// Note: yeah its slow! 7 weeks of 10-minute data will do that to your forecaster.
//
export sub run() {
const from = :2014-01-01: ;
const dur = :7w: ;
const to = from + :7w:;
const halfto = from + :2w:;
sources.bumpy_cpu -from from -to from + dur -every :h:
| (
seasonal_over -in 'cpu' -from from -to halfto -period :w: -out 'half-fit-weekly'
| seasonal -in 'cpu' -period :w: -out 'streaming-weekly'
| ts.split_chart -title "simulated cpu, weekly seasonality correction";
squash_weekly -in 'cpu' -out 'squashed_weekly'
| ts.split_chart -title "fitted weekly variation removed";
seasonal_over -in 'cpu' -from from -to halfto -period :d: -out 'half-fit-daily'
| seasonal -in 'cpu' -period :d: -out 'streaming-daily'
| ts.split_chart -title "simulated cpu, daily seasonality correction (confused by the weekends)";
squash_daily -in 'cpu' -out 'squashed_daily'
| ts.split_chart -title "fitted daily variation removed";
) | stdj.end
}
run
// squash_daily using trend.mean_every -every :2d: crapped out
// sources -- simulated juttle sources for demos and tests
//
//
// a square wave of given period and duty cycle (0..1)
//
export function square(time, from, over, duty) {
const dt = time - Date.new(0);
return (((dt % over) - ((dt + over*duty) % over))/ over + duty);
}
export sub emit_square(from, to, every, over, amp, duty, out) {
emit -from from -to to -every every | put *out = square(time, from, over, duty) * amp
}
//
// a ramp of given period (-over)
//
export function ramp(time, from, over, out) {
return ((time - from) % over) / over;
}
export sub emit_ramp(from, to, every, over, amp, out) {
emit -from from -to to -every every | put *out = ramp(time, from, over, out) * amp
}
//
// a sine wave of given period (-over)
//
export function sine(time, from, over) {
return Math.sin((time - from) / over * 2 * Math.PI);
}
export sub emit_sine(from, to, every, over, amp, out) {
emit -from from -to to -every every | put *out = sine(time, from, over) * amp;
}
//
// weekend_factor: a calendar-based scale factor that decreases by 2/3 over weekends.
//
function weekend_factor(time) {
if ((Date.get(time,'h') > 22 && Date.get(time, 'e') == 5)
|| Date.get(time, 'e') == 6
|| (Date.get(time,'h') < 6 && Date.get(time, 'e') == 1)) {
return 0.6 ;
} else if (Date.get(time, 'e') == 0) {
return 0.3 ;
} else {
return 1.0;
}
}
//
// bumpy_cpu:
// a 10-minute-ish cpu metric with noise and daily periodicity but no trend
//
export sub bumpy_cpu(from, to, every) {
read -demo 'cdn' -nhosts 4 -dos .3 -dos_dur :15s: -daily 3
-from from -to to -every every
host ~ 'sjc*' name = 'cpu'
| put cpu=*"value" * weekend_factor(time)
| keep cpu, time
}
//
// ripple_cpu:
// a 10-second cpu metric with noise and daily periodicity but no trend
//
export sub ripple_cpu(from, to) {
read -demo 'cdn' -nhosts 4 -daily 0
-ripple .5 -ripple_dur 60 -ripple_levels [-0.25, -0.25, 0, 0, 0, 0, 0.5]
-from from -to to -every :10s:
host ~ 'sjc*' name = 'cpu'
| put cpu=*"value"
| keep cpu, time
}
//
// sparkfun: import a sparkfun feed and recover numbers from strings. this is a hack.
//
export sub sparkfun(hash) {
source -timeField "timestamp" "https://data.sparkfun.com/output/"+hash+".json"
| split
| put value = Number.fromString(value)
| join
}
export sub run() {
// example:
const start = :2014-01-01: ;
const dur = :1h: ;
bumpy_cpu -from :-2d: -to :now: -every :20m:| @timechart -title "2-day cpu variation"
;
ripple_cpu -from start -to start+dur
| @timechart -title "hourly cpu variation" -display.dataDensity 0
}
run
// stdj -- standard juttle utilities
//
// warnf: display the given message as a runtime warning
// (hack implementation throws an error, pending a proper PROD-7487)
//
export function warnf(msg) {
var warning = Date.new('"'+msg+'"');
}
//
// warn: inline, send a warning message to a logger if assert is false or null.
// The assert field will be removed, for the caller's convenience.
//
export sub warn(assert, msg) {
(
put PROD_2850 = *assert
| filter (PROD_2850 == null) or (PROD_2850 == false)
| put warning = msg
| remove PROD_2850, assert
| @logger -title "WARNINGS";
merge;
)
}
//
// log: inline logging tap
//
export sub log(title) {
(
@logger -title title -display.ticks true -display.times true;
merge;
)
}
//
// end: a do-nothing sink, used to cap a series of inline log or chart
// subs having pass-through merge flows
//
export sub end() {
filter time == null | @logger -title "TheEnd" -display.height 1
}
//
// previous: return the value of the previous point's field. aka next-to-last.
//
export reducer previous(in, default) {
var current = default;
var last = default;
function update() {
last = current;
current = *in;
}
function result() {
return last;
}
}
export sub Demo() {
emit -limit 10 -every :1ms: | put N = count()
| (merge ; put assert = N!=5 | warn -assert 'assert' -msg "lookout, N is five")
| @logger -title "my output"
}
Demo
// model trends in a series via linear regression.
//
import 'https://gist.githubusercontent.com/welch/3f7b696beab6ba1b55bb/raw/math.juttle' as math;
import 'https://gist.githubusercontent.com/welch/3f7b696beab6ba1b55bb/raw/sources.juttle' as sources;
//
// fit_coeffs: Linear regression vs time, for a batch or window of points.
// parameters:
// y -- field to fit
// t0 -- time origin for first fitted line
// initial -- if non-null, field containing initial values.
// reducer will do nothing until it receives a point with this field set,
// then it will initialize using the field and begin normal operation.
// returns vector [b, m, t0, n]
// b -- yintercept, m: slope (per second), n: number of points fit.
//
export reducer fit_coeffs(y, initial) {
var t0, n, T, Y, T2, TY; // accumulated regression state
var initialized = false;
function update() {
var time = *'time';
if (!initialized && initial == null) {
// default initialization from first point
n = T = Y = T2 = TY = 0;
t0 = time;
initialized = true;
} else if (!initialized && initial != null && *initial != null) {
// initialize using upstream state in initial
t0 = (*initial)[2];
n = (*initial)[3];
T = (*initial)[4];
Y = (*initial)[5];
T2 = (*initial)[6];
TY = (*initial)[7];
initialized = true;
}
if (time != null) {
var t = Duration.seconds(time - t0) ;
T = T + t;
Y = Y + *y;
T2 = T2 + t * t;
TY = TY + t * *y;
n = n + 1;
}
}
function expire() {
var time = *'time';
if (initialized && time != null) {
var t = Duration.seconds(time - t0) ;
T = T - t;
Y = Y - *y;
T2 = T2 - t * t;
TY = TY - t * *y;
n = n - 1;
}
}
function result() {
var inv = math.inv2x2(T2, T, T, n) ;
var m = inv[0] * TY + inv[1] * Y;
var b = inv[2] * TY + inv[3] * Y;
return [b, m, t0, n, T, Y, T2, TY];
}
function reset() {
initialized = false;
}
}
//
// value:
// evaluate the trendline represented by coeffs at the given time
//
export function value(t, coeffs, default) {
return math.finite(coeffs[0] + Duration.seconds(t - coeffs[2]) * coeffs[1], default);
}
//
// compute trend coefficients
//
export sub fit_every(in, from, to, every, out) {
(
reduce -every every -over every -from from -to to _trend_coeffs = fit_coeffs(in, null)
| pace -every :0s: -from from ;
merge;
) | join -onchange 2
| filter _trend_coeffs != null // PROD-2850
| put *out = _trend_coeffs
| remove _trend_coeffs
}
//
// trend:
// streaming linear fit over a trailing window of points
// and put its value on the incoming point. A new fit is computed at each point
// as it arrives, with no output delay.
//
// parameters:
// in -- input field
// every,over -- trailing window to fit trend over
// out -- output field for projected line's value
//
export sub trend_every(in, from, to, every, out) {
fit_every -from from -to to -every every -in in -out out
| put *out = value(time, *out, *in)
}
export sub trend(in, over, out) {
put -over over PROD_7488=true, *out = value(time, fit_coeffs(in, null), *in)
| remove PROD_7488
}
//
// detrend:
// de-trend intervals of points by subtracting the MSE linear fit
// as each point arrives.
// parameters:
// in -- field to de-trend
// every,over -- data window for fitting
// out -- output field for detrended value on each point
//
export sub detrend_every(in, from, to, every, out) {
trend_every -from from -to to -every every -in in -out out
| put *out = *in - *out
}
export sub detrend(in, over, out) {
put -over over PROD_7488=true, *out = *in - value(time, fit_coeffs(in, null), *in)
| remove PROD_7488
}
export sub detrend_by(in, over, out, by) {
put -over over PROD_7488=true, *out = *in - value(time, fit_coeffs(in, null), *in) by by
| remove PROD_7488
}
//
// rate:
// robustly estimate rate of change by fitting trendlines to a short moving
// window of points as each point arrives.
//
// parameters:
// in -- field to rate-estimate
// every,over -- time window for rate estimator (longer values give smoother derivatives)
// out -- output field for rate (in/second) on each point
//
export sub rate_every(in, from, to, every, out) {
fit_every -from from -to to -every every -in in -out out
| put *out = (*out)[1];
}
export sub rate(in, over, out) {
put -over over PROD_7488=true, *out = math.finite(fit_coeffs(in, null)[1], null)
| remove PROD_7488
}
//
// change:
// estimate the amount of change over an interval due to a linear trend over the interval.
// parameters:
// in -- field to rate-estimate
// every,over -- time window for rate estimator (longer values give smoother derivatives)
// out -- output field for amount change in this interval due to linear trend.
//
export sub change_every(in, from, to, every, out) {
rate_every -from from -to to -every every -in in -out out
| put *out = *out * Duration.seconds(every)
}
export sub change(in, over, out) {
put -over over PROD_7488=true, *out = math.finite(fit_coeffs(in, null)[1] * Duration.seconds(over),null)
| remove PROD_7488
}
//
// mean:
// compute the average over the interval.
// parameters:
// in -- field to average
// every,over -- time window for averaging
// out -- output field for amount change in this interval due to linear trend.
//
export sub mean_every(in, from, to, every, out) {
(
reduce -every every -over every -from from -to to _mean_tmp = avg(in)
| pace -every :0s: -from from;
merge;
) | join -onchange 2
| put *out = _mean_tmp | remove _mean_tmp
}
export sub mean(in, over, out) {
put -over over _mean_tmp = avg(in)
| put *out = _mean_tmp
| remove _mean_tmp
}
export sub demean_every(in, from, to, every, out) {
mean_every -in in -from from -to to -every every -out '_demean_tmp'
| put *out = *in - _demean_tmp
| remove _demean_tmp
}
export sub demean(in, over, out) {
put -over over _demean_tmp = *in - avg(in)
| put *out = _demean_tmp
| remove _demean_tmp
}
export sub demean_by(in, over, out, by) {
put -over over _demean_tmp = *in - avg(in) by by
| put *out = _demean_tmp
| remove _demean_tmp
}
//
// demo: de-trending and de-meaning
// fit trendlines to successive batches of points in various ways,
// over long intervals, for de-trending prior to de-seasonalizing and
// anomaly detection. superimpose them on the original data.
//
export sub run() {
const start = :2014-01-01: ;
const dur = :1w: ;
const durS = Duration.toString(dur);
sources.bumpy_cpu -from start -to start + dur -every :20m:
| (
mean_every -in 'cpu' -from start -to start + dur -every dur -out 'mean-every-'+durS
| mean_every -in 'cpu' -from start -to null -every :6h: -out 'mean-every-6h'
| mean -over dur -in 'cpu' -out 'streaming-mean-'+durS
| split
| @timechart -title "means" -display.dataDensity 0;
demean_every -in 'cpu' -from start -to start + dur -every dur -out 'demean-every-'+durS
| demean_every -in 'cpu' -from start -to null -every :6h: -out 'demean-every-6h'
| demean -over dur -in 'cpu' -out 'streaming-demean-'+durS
| split
| @timechart -title "demeans" -display.dataDensity 0;
trend_every -in 'cpu' -from start -to start + dur -every dur -out 'trend-'+durS
| trend_every -in 'cpu' -from start -to null -every :6h: -out 'trend-6h'
| trend -over dur -in 'cpu' -out 'streaming-trend-'+durS
| split
| @timechart -title "trends" -display.dataDensity 0;
detrend_every -in 'cpu' -from start -to start + dur -every dur -out 'detrend-every-'+durS
| detrend_every -in 'cpu' -from start -to null -every :6h: -out 'detrend-6h'
| detrend -over dur -in 'cpu' -out 'streaming-detrend-'+durS
| split
| @timechart -title "detrends" -display.dataDensity 0;
)
;
//
// demo: use change to highlight rate-of-change anomalies
//
sources.ripple_cpu -from start -to start + :1h:
| change_every -in 'cpu' -from start -to start + :1h: -every :h: -out 'change-every-h'
| change_every -in 'cpu' -from start -to null -every :60s: -out 'change-every-m'
| change -in 'cpu' -over :60s: -out 'streaming-change-m'
| split
| @timechart -title "changes" -display.dataDensity 0
}
run
// ts -- juttle timeseries utilities
//
import 'https://gist.githubusercontent.com/welch/3f7b696beab6ba1b55bb/raw/stdj.juttle' as stdj;
//
// downsample: thin a series of values by retaining its extreme values over successive intervals
// parameters:
// in -- name of value field
// every -- interval of each downsample
//
export sub downsample(in, every) {
reduce -every every _downsample_max = max(in), _downsample_min = min(in)
| put *in = (Math.abs(_downsample_max) >= Math.abs(_downsample_min)) ? _downsample_max : _downsample_min
| remove _downsample_min, _downsample_max
}
//
// downsample_by: thin multiple series of values identified by the named field
// parameters:
// in -- name of value field
// every -- interval of each downsample
// by -- name of field identifying each series.
//
export sub downsample_by(in, every, by) {
reduce -every every _downsample_max = max(in), _downsample_min = min(in) by by
| put *in = (Math.abs(_downsample_max) >= Math.abs(_downsample_min)) ? _downsample_max : _downsample_min
| remove _downsample_min, _downsample_max
}
//
// lag: return the value of a field from N points ago.
//
// Note: this must be run as a windowed reducer, eg, reduce -every :h: -over :h:
//
// (running this as a windowed reducer means its reset() function will be called
// at the beginning of each new interval rather than the reducer being torn down and
// rebuilt. This allows the reducer to propagate values across intervals.)
//
// Note 2: if you want to lag by an amount of time rather than a number of
// points, use a windowed first()
//
export reducer lag(field, N, initial) {
var lagged = [];
var i = 0, n = 0;
function update() {
lagged[i] = *field;
n = Math.min(n + 1, N + 1);
i = (i + 1) % (N + 1);
}
function result() {
if (n < N + 1) {
return initial;
} else {
return lagged[i];
}
}
function reset() {
// never reset
}
}
//
// delta: return the difference betwen the current value of a field
// and that from the previous point.
//
// Note: this must be run as a windowed reducer, eg, reduce -every :h: -over :h:
//
// (running this as a windowed reducer means its reset() function will be called
// at the beginning of each new interval rather than the reducer being torn down and
// rebuilt. This allows the reducer to propagate values across intervals.)
//
// Note 2: if you want a delta over an amount of time rather than a number of
// points, use a windowed (last() - first())
//
export reducer delta(in) {
var last = null;
var this = null;
function update() {
last = this;
this = *in;
}
function result() {
return (last != null) ? this - last : this - this;
}
function reset() {
// don't reset
}
}
//
// chart_by: an inline chart. if by is null, chart will guess field
// which it should use (and may do a poor job of it)
export sub chart_by(by, title) {
(
@timechart -title title -id title -display.dataDensity 0 -keyField by;
merge;
)
}
export sub chart_by_every(by, title, every) {
(
@timechart -title title -id title -display.dataDensity 0 -keyField by -display.duration every;
merge;
)
}
//
// split_chart: inline chart that includes a split. Send it points having
// multiple value fields on them to see each field plotted
// as its own series.
//
export sub split_chart(title) {
(
split | @timechart -title title -id title -display.dataDensity 0;
merge;
)
}
export sub split_chart_every(title, every) {
(
split | @timechart -title title -id title -display.dataDensity 0 -display.duration every;
merge;
)
}
export sub split_chart_end(title) {
split | @timechart -title title -id title -display.dataDensity 0;
}
export sub split_chart_range(title, from, to) {
(
split | filter time > from time < to |
@timechart -title title -id title -display.dataDensity 0;
merge;
)
}
//
// dual_chart_by: an inline chart with two veritical axes for overlaying
// a differently scaled series
// parameters:
// by -- name of field holding series names (including secondary)
// secondary -- name of secondary series (all others go to primary axis)
//
export sub dual_chart_by(title, by, secondary) {
(
@timechart -id title -o {
keyField: by,
title: title,
display: { dataDensity: 0 },
yScales: { secondary: { } },
series: [
{ name: secondary, yScale: 'secondary' }
]
};
merge;
)
}
//
// split_dual_chart: an inline split chart with two veritical axes for overlaying
// a differently scaled series
// parameters:
// secondary -- name of secondary series (all others go to primary axis)
//
export sub split_dual_chart(title, secondary) {
(
split | @timechart -id title -o {
keyField: 'name',
title: title,
display: { dataDensity: 0 },
yScales: { secondary: { } },
series: [
{ name: secondary, yScale: 'secondary' }
]
};
merge;
)
}
// split_dual_chart_by: an inline split chart with two veritical axes for overlaying
// a differently scaled series
// parameters:
// by -- name of field holding series names (including secondary)
// secondary -- name of secondary series (all others go to primary axis)
//
export sub split_dual_chart_by(title, by, secondary) {
(
split by | @timechart -id title -o {
keyField: [by,'name'],
title: title,
display: { dataDensity: 0 },
yScales: { secondary: { } },
series: [
{ name: secondary, yScale: 'secondary' }
]
};
merge;
)
}
export sub run() {
emit -limit 100 -every :.1s:
| put c = count(), x = c * ((c % 2 == 0)? 1 : -1)
| downsample -in "x" -every :.2s:
| put c = count(), z = c/2
| split_dual_chart -title 'testing' -secondary 'c'
| stdj.end
}
run
// Twitter:
//
// Apply robust outlier detection to the twitter anomaly series
//
import 'https://gist.githubusercontent.com/welch/3f7b696beab6ba1b55bb/raw/ts.juttle' as ts;
import 'https://gist.githubusercontent.com/welch/3f7b696beab6ba1b55bb/raw/seasonal.juttle' as seasonal;
import 'https://gist.githubusercontent.com/welch/3f7b696beab6ba1b55bb/raw/anomaly.juttle' as anomaly;
export sub run() {
const from = :1980-09-25T14:01:00Z:;
const to = :1980-10-05T13:01:00Z:;
const preroll = from + :3d:; // training window
const in = 'count';
const outlier = 3; // alert on 3 sigma values
const title = "twitter anomalies";
source ("https://gist.githubusercontent.com/welch/a34b41e57db2075b5c4a/raw/"+
"f215975aa432cc5ea254769f3ebef77806d2aa44/twitter-anomaly.json")
// twitter data is at 1 minute resolution. we don't need that to spot historic anomalies, downsample for speed.
| ts.downsample -in in -every :10m:
| ts.split_chart -title title
| (
anomaly.normalize_daily -in in -out in
| filter time > preroll | ts.split_chart_end -title "daily normalized";
anomaly.outlier_chart_daily -in in -every :30m: -after preroll -sigma outlier -title title;
// anomaly.outlier_chart_forecast_daily -in in -every :30m: -after preroll -sigma outlier -title title+" with forecast";
)
}
run
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment