Skip to content

Instantly share code, notes, and snippets.

@welch
Last active October 20, 2015 21:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save welch/0c2600f05fbff773591f to your computer and use it in GitHub Desktop.
Save welch/0c2600f05fbff773591f to your computer and use it in GitHub Desktop.
predict: return the unsurprising portion of a metric stream by estimating trend and seasonal effects
// Predict:
// Trend, seasonality, and noise estimation for metric streams.
//
// Predict is a sub which consumes a metric stream and emits a
// prediction stream based on estimated trend, seasonality, and a
// smoothed version of the input. Prediction errors (the difference
// between predict's output and the input) can signal anomalies in the
// stream.
//
// usage: predict [options]
// options:
// -field ("value") Name of metric field to predict
// -over (:w:) Period: the length of one repeating cycle in the input metric.
// -every (default is based on period) interval between emitted prediction points
// -pct (0.5) percentile to retain during initial reduction to every-spaced values
// -nonneg (true) do not allow predictions to become negative.
// -detrend (true) if false, do not remove estimated trend
// -deseason (true) if false, do not remove estimated cyclic effect
// -denoise (true) if false, do not smooth the detrended/deseasoned value
//
// predict is intended to be used with a historic or superquery that
// includes enough history to initialize its estimators. predict can
// begin emitting prediction points almost immediately, though the
// early points will simply be de-meaned. After 2 periods have gone by,
// trend and seasonality (at the resolution of -every) will switch on,
// and after 3 periods have passed all estimators have full windows of
// data.
//
// output:
// each output point contains the following fields:
// *field : average value of input field over -every
// T : portion of field predicted by the trend estimator
// S : portion of field predicted by seasonality
// Y : portion of field predicted by smoothing
// P : predicted value of field, T + S + Y
// E : prediction error, P - field
// Z : normalized error based on the sample stddev over trailing periods
//
// Return the sample Z-score of the specified field.
//
// Parameters:
//
// * field: fieldname
//
// compute an approximate Z-score for an input value, using sample
// estimates of population mean and stddev:
// (value - sample mean) / sample stddev
//
reducer z(field) {
var sum = 0;
var ssum = 0;
var n = 0;
var latest = null;
function update() {
var v = *field;
if (v != null) {
latest = v;
sum = sum + v;
ssum = ssum + v*v;
n = n + 1;
}
}
function result() {
if (n < 2 ) {
return null;
} else {
var mean = sum / n;
var stddev = Math.sqrt(1/(n-1) * (ssum - sum*sum/n));
return (latest - mean) / stddev;
}
}
function expire() {
var v = *field;
if (v != null) {
sum = sum - v;
ssum = ssum - v*v;
n = n - 1;
}
}
};
// Exponential weighted moving average
//
// Parameters:
//
// * field: fieldname
// * alpha: weight of newest value in moving average
// * initial: initial value of the moving average, defaults to first update
// * offset: name of field containing an offset to be applied to the stored value
//
reducer ewma(field, alpha, initial=null, offset=null) {
var value = initial;
function update() {
if (*field != null) {
if (value == null) {
value = *field;
}
value = value + alpha * (*field - value);
if (offset != null && *offset != null) {
value = value - *offset;
}
}
}
function result() {
return value;
}
}
// Return the Nth lagged value of a field
//
// Parameters:
//
// * field: name of value field
// * N: (1) lag
//
reducer L(field, N=1, undef=null) {
var values = [];
var n = 0, i = 0;
var current = null;
function update() {
if (*field != null) {
values[i] = *field;
i = (i + 1) % (N + 1);
if (n <= N) {
n = n + 1;
}
}
}
function result() {
if (n == N + 1) {
return values[i];
} else {
return undef;
}
}
}
// Return a nice subdividing duration, preserving calendarness where possible
//
// Parameters:
//
// * d: duration to subdivide
//
export function subdivide_duration(d) {
if (Duration.as(d, "y") == Math.floor(Duration.as(d, "y"))) {
return :M:;
} else if (Duration.as(d, "M") == Math.floor(Duration.as(d, "M"))) {
return :d:;
} else if (Duration.as(d, "w") == Math.floor(Duration.as(d, "w"))) {
return :6h:;
} else if (Duration.as(d, "d") == Math.floor(Duration.as(d, "d"))) {
return :h:;
} else if (Duration.as(d, "h") == Math.floor(Duration.as(d, "h"))) {
return :5m:;
} else {
return d / 30;
}
}
// estimate trailing trend and median as the median
// duration-over-duration change of all samples in a window of -over
// duration (a variant of the Theil-Sen estimator). This can use up to
// 2 periods of historic data per point, but begins outputting a
// windowed median immediately. trend expects its input to already be
// aggregated -every.
//
// Parameters:
//
// * field: Name of metric field to predict
// * over: Period: length of one repeating cycle in the input metric.
// * every: interval between emitted prediction points
// * revise: if true, include the current value in the slope estimate.
//
// Output:
//
// * T: portion of field predicted by trend
// * M: slope (change per over), or null if no slope was estimated
// (T will have a trailing median value if M is null)
//
sub trend(field, over, every, revise=true) {
const N = Math.round(over / every);
put __bucket = count() % N
| put __per_over = delta(field,null) by __bucket // same bucket, successive periods
| put -over 2 * over
__t0 = first(time) + (last(time) - first(time)) / 2,
__y0 = percentile(field,0.5),
M = __per_over != null ? percentile(__per_over,0.5) : null
| put M = revise ? M : L("M") ?? M // use current or previous M/y0/t0 to predict trend at time
| put __y0 = revise ? __y0 : L("__y0") ?? __y0
| put __t0 = revise ? __t0 : L("__t0") ?? __t0
| put T = (M != null) ? __y0 + M * (time - __t0) / over : __y0
| remove __t0, __y0, __bucket, __per_over
}
// estimate cyclic variation as a moving median of the values over
// successive epochs at a given phase. This can use up to 3 periods
// of historic data per point, and at startup simply replays the most
// recent value. seasonal expects its input to already be de-trended
// and aggregated -every.
//
// Parameters:
//
// * field: Name of metric field to predict
// * over: Period: length of one repeating cycle in the input metric.
// * every: interval between emitted prediction points
// * revise: if true, include the current value in the seasonal estimate.
//
// Output:
//
// * S: portion of field predicted by seasonality
//
sub seasonal(field, over, every, revise) {
const N = Math.round(over / every);
put __bucket = count() % N, __n=count()
| put -over 3 * over S = (__n > 4) ? percentile(field,0.5) : null by __bucket
| put S = revise ? S : L("S") by __bucket // use current or previous seasonal at time
| remove __bucket, __n
}
// windowed median as a less-noisy prediction of mean. the optional
// offset field is examined for a nonzero impulse to be added to the
// windowed values state. This is allows incorporation of a
// discontinuity in the series.
//
// Parameters:
//
// * field: Name of metric field to predict
// * every: interval between emitted prediction points
// * offset: fieldname for an impulse to be added
// * revise: if false, return previous value as the prediction
//
// Output:
//
// * Y : portion of field predicted by smoothing
//
sub level(field, every, offset, revise) {
put -over 3 * every Y = percentile(field, 0.5)
| put Y = revise ? Y : (L("Y") ?? Y)
| put -over 3 * every Y = Y + (sum(offset) ?? 0)
}
// predict the mean of the distribution of next field value at the
// current time from prior values (revise=false), optionally including
// the current field value in the trend and seasonal estimates
// (revise=true). This can use up to 3 periods of historic data per
// point, but can begin producing (noisy) point-to-point estimates
// after a few points.
//
// Parameters:
//
// * field: ("value") Name of metric field to predict
// * over: (:w:) Period: length of one repeating cycle in the input metric.
// * every: (default is based on value of over) interval between emitted prediction points
// * pct: (0.5) percentile to retain during initial reduction to every-spaced values
// * nonneg: (true) do not allow predictions to become negative.
// * detrend: (true) if false, do not remove estimated trend
// * deseason: (true) if false, do not remove estimated cyclic effect
// * denoise: (true) if false, do not smooth the detrended/deseasoned value
// * revise: (true) if false, do not include current value in trend and seasonal estimates
//
// Output:
//
// predict consumes its input stream, and outputs points every -every.
// each output point contains the following fields:
// * *field: percentile value of input field over -every, from -pct (default median)
// * T: portion of field predicted by the trend estimator
// * S: portion of field predicted by seasonality
// * Y: portion of field predicted by smoothing
// * P: predicted value of field, T + S + Y
// * E: prediction error, P - field
// * Z: normalized error based on the sample stddev over trailing periods
//
export sub predict(field = "value", over = :w:, every=null, pct=0.5, nonneg=true,
detrend=true, deseason=true, denoise=true, revise=true) {
const __every = every ?? subdivide_duration(over);
reduce -every __every time= Date.quantize(first(time), __every), *field = percentile(field, pct)
| trend -field field -every __every -over over -revise revise
| put T = detrend ? T : 0
| put __detrend = *field - T
| put __LT = L("T") ?? 0 // before M becomes valid, T is windowed median.
| put __offset = ((M != null && L("M") == null) ? __LT - T : 0)
| seasonal -field '__detrend' -over over -every __every -revise revise
| put S = deseason ? S : 0
| put __offset = __offset - ((S != null && L("S") == null) ? S : 0)
| put __deseason = __detrend - (S ?? 0)
| level -field '__deseason' -every __every -revise revise -offset '__offset'
| put Y = denoise ? Y : (L("__deseason") ?? 0) + __offset
| put P = (S ?? 0) + (T ?? 0) + Y
| put P = nonneg ? Math.max(P, 0) : P
| put E = P - *field
| put -over 3*over Z = z('E')
| remove __LT, __detrend, __deseason, __offset, M
}
//////////////////////////////////////////////////////////////////////////////
// predict demos: trend and seasonality
//
// the following metric sources all work with the default predict interval of :w:
// all reads are from jx3 or j4qa as indicated.
//
input last: duration -default :30 days: -label 'Show this duration:';
export sub live_juttle_runs(dur1=:3M:) {
read -from (:now: + :1d: - dur1) -space 'prod'
event = 'run-juttle' AND
properties_page_currentPage ='https://app.jut.io/#explorer' AND
context_ip != '207.141.12.50'
| reduce -every :h: run_count = count()
| (@timechart -title "Weekly App Juttle Program Runs" -valueField 'run_count' -display.dataDensity 0; put value=run_count)
}
export sub live_pageviews() {
read -from :now: - last -space 'prod' type = 'page' properties_url ~ 'http://docs.jut.io/*'
NOT (context_ip in ['207.141.12.50','54.149.158.203','54.68.207.93','52.10.135.75','54.148.36.156','52.24.101.163'])
| reduce -every :h: pageviews = count(), users = count_unique(anonymousId)
| (@timechart -title "live pageviews" -valueField 'pageviews' -display.dataDensity 0; put value=pageviews)
}
export sub live_users() {
read -from :now: - last -space 'prod' type = 'page' properties_url ~ 'http://docs.jut.io/*'
NOT (context_ip in ['207.141.12.50','54.149.158.203','54.68.207.93','52.10.135.75','54.148.36.156','52.24.101.163'])
| reduce -every :h: pageviews = count(), users = count_unique(anonymousId)
| (@timechart -title "live users" -valueField 'users' -display.dataDensity 0; put value=users)
}
export sub slack_messages() {
// this runs in j4qa rather than jx3
read -from :-M: -space "slack"
type = "message" and user_name != null
| reduce -every :h: name="slack_messages", value = count()
| (@timechart -title "slack messages" -display.dataDensity 0; pass)
}
sub pulse(field, value, start, dur) {
put *field = *field + ((time >= start && time < start + dur) ? value : 0)
}
sub slide(field, value, start, dur) {
put *field = *field + (
(time >= start && time < start + dur)
? ((time - start) / dur) * value
: 0)
}
export sub series(from, to, every, over, trend = 1, season = 10, alpha = 0.5, sigma=0.1, initial=0) {
// generate a series of values having given trend, seasonality, noise, and autocorrelation (ewma alpha)
emit -from from -to to -every every
| put n = count(), dy = (time - Date.new(0)) / over, cycle = Math.sin(dy * 2 * Math.PI)
| put step = sigma * (2 * Math.random() - 1)
| put step = ewma("step", alpha, initial)
| put value = n * trend + season * cycle + step
| slide -field 'value' -value -20 -start from + 5.25 * over -dur 4*every
}
export sub posSeries(from, to, every, over, trend = 1, season = 10, alpha = 0.5, sigma=0.1, initial=0) {
// like series, but keep it positive
emit -from from -to to -every every
| put n = count(), dy = (time - Date.new(0)) / over, cycle = Math.abs(Math.sin(dy * 2 * Math.PI))
| put step = sigma * (2 * Math.random() - 1)
| put step = ewma("step", alpha, initial)
| put value = n * trend + season * cycle + step
| slide -field 'value' -value -20 -start from + 5.25 * over -dur 4*every
| put value = Math.abs(value)
}
// null invocation of predict that uses the previous value as the
// prediction of the next value. not bad.
//
export sub dontPredict(field = "value", over = :w:, every=null, pct=0.5, nonneg=true) {
predict -field field -over over -every every -pct pct -nonneg nonneg
-detrend false -deseason false -denoise false -revise false
}
const over = :w:;
const every = subdivide_duration(over);
// uncomment any one of the following sources:
posSeries -from Date.new(0) -to Date.new(0)+ 8*over -every every -over over -trend .05 -season 20 -alpha 0.5 -sigma 4//live_juttle_runs
//live_pageviews
//live_users
//slack_messages
// | dontPredict
| predict
| (
put name="stderr", value = sigma("E") | @tile;
split value, P, T, S, Y | @timechart -keyField 'name' -display.dataDensity 0;
split E | @timechart -keyField 'name' -display.dataDensity 0;
split Z | @timechart -keyField 'name' -display.dataDensity 0;
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment