Skip to content

Instantly share code, notes, and snippets.

@welch
Last active August 29, 2015 14:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save welch/d8f9ec42a871e397c46f to your computer and use it in GitHub Desktop.
Save welch/d8f9ec42a871e397c46f to your computer and use it in GitHub Desktop.
// bit-o-juttle: juttle examples

Bit-O-Juttle

Little bits of juttle. Standalone examples, best practices, worst practices

// Alerting with context:
//
// This example shows how to annotate an alert event with recent lines from
// a logging stream, to give context for the alert. This is accomplished with
// a custom reducer that tails the log stream, and a join of the current tail
// against each alert event as it occurs.
//
///////////////////////////////////////////////////////////////////////////////
//
// simulated alert and logging streams
//
sub alerts() {
emit -every :5s: -limit 3
| put event="alert"
}
sub logs() {
emit -every :250ms: -limit 50
| put message="at "+Date.toString(time)+" the count was "+Number.toString(count())
}
//
// tail a field in a stream of points
//
reducer tail_field(field, N) {
var context = [];
var c_i = 0, c_n = 0;
function update() {
context[c_i] = *field;
c_n = Math.min(c_n + 1, N);
c_i = (c_i + 1) % N;
}
function get_tail(tail, j, c_j) {
if (tail == null) {
tail = []; j = 0; c_j = (c_i - c_n + N) % N;
}
if (j < c_n) {
tail[j] = context[c_j];
return get_tail(tail, j + 1, (c_j + 1) % N);
} else {
return tail;
}
}
function result() {
return get_tail(null, null, null);
}
}
//
// Live stream demo: tap the alert and logging streams, and join them
// whenever an alert fires.
//
(
alerts;
logs | put context = tail_field('message', 5);
)| join -onchange 1
| @table
reducer normal(mean, sigma) {
// return a Box-Muller approximate normal with given mean and stddev.
// this is written as a custom "reducer" because Box-Muller values come
// in pairs, and we want to save one for next time.
//
var leftover = null;
function update() {}
function result() {
if (leftover != null) {
var result = mean + sigma * leftover;
leftover = null;
return result;
} else {
var u = 2 * Math.random() - 1;
var v = 2 * Math.random() - 1;
var r = u*u + v*v;
if (r == 0 || r > 1) {
// out of bounds, try again
return result();
}
var c = Math.sqrt(-2*Math.log(r)/r);
leftover = u * c;
return mean + sigma * v * c;
}
}
}
reducer normal(mean, sigma) {
// return a Box-Muller approximate normal with given mean and stddev.
// this is written as a custom "reducer" because Box-Muller values come
// in pairs, and we want to save one for next time.
//
var leftover = null;
function update() {}
function result() {
if (leftover != null) {
var result = mean + sigma * leftover;
leftover = null;
return result;
} else {
var u = 2 * Math.random() - 1;
var v = 2 * Math.random() - 1;
var r = u*u + v*v;
if (r == 0 || r > 1) {
// out of bounds, try again
return result();
}
var c = Math.sqrt(-2*Math.log(r)/r);
leftover = u * c;
return mean + sigma * v * c;
}
}
}
reducer xdp(y, P) {
// given a list of data values and corresponding percentiles, return the
// derivative. This numerically differentiates the EDF to get an approximate
// PDF. It can be a bumpy ride.
var Y = [];
function update() {
Y = *y; //P = *pct;
}
function iter(X, dPdX, n) {
if (X == null || X[n+1] == null) {
return dPdX;
} else if (n > 0) {
dPdX[n] = [(X[n+1]+X[n])/2, (P[n+1] - P[n]) / (X[n+1] - X[n])];
}
return iter(X, dPdX, n+1);
}
function result() {
return iter(Y, [0], 0);
}
}
export const PCT = [0, .01, .02, .03, .04,.05,.06,.07, .1, .15, .2, .25, .3, .35, .4, .45, .5, .55, .6, .65, .7, .75, .8, .85, .9, .92,.93,.94,.95,.96, .97, .98, .99, 1];
export sub run() {
emit -limit 100000 -every :.1s:
| put x=normal(6,1), y=normal(7, 2)
| (@scatterchart -controlField 'x' -valueField 'y' -title 'X vs Y'
-display.duration :10s: -display.markerOpacity 0.95
-xScales.primary.minValue 0 -xScales.primary.maxValue 10
-yScales.primary.minValue 0 -yScales.primary.maxValue 10
; merge)
| reduce -acc true -every :1s: px = percentile(x, PCT), py = percentile(y, PCT)
| put dpx = xdp(px, PCT), dpy = xdp(py, PCT)
| split dpx, dpy
| put x=value[0], dp=value[1]
| @scatterchart -controlField 'x' -valueField 'dp' -title 'density' -keyField 'name'
-display.duration :3s:
-yScales.primary.minValue 0 -yScales.primary.maxValue 0.6
}
run
reducer normal(mean, sigma) {
// return a Box-Muller approximate normal with given mean and stddev.
// this is written as a custom "reducer" because Box-Muller values come
// in pairs, and we want to save one for next time.
//
var leftover = null;
function update() {}
function result() {
if (leftover != null) {
var result = mean + sigma * leftover;
leftover = null;
return result;
} else {
var u = 2 * Math.random() - 1;
var v = 2 * Math.random() - 1;
var r = u*u + v*v;
if (r == 0 || r > 1) {
// out of bounds, try again
return result();
}
var c = Math.sqrt(-2*Math.log(r)/r);
leftover = u * c;
return mean + sigma * v * c;
}
}
}
reducer xdp(y, P) {
// given a list of data values and corresponding percentiles, return the
// derivative. This numerically differentiates the EDF to get an approximate
// PDF. It can be a bumpy ride.
var Y = [];
function update() {
Y = *y; //P = *pct;
}
function iter(X, dPdX, n) {
if (X == null || X[n+1] == null) {
return dPdX;
} else if (n > 0) {
dPdX[n] = [(X[n+1]+X[n])/2, (P[n+1] - P[n]) / (X[n+1] - X[n])];
}
return iter(X, dPdX, n+1);
}
function result() {
return iter(Y, [0], 0);
}
}
export const PCT = [0, .01, .02, .03, .04,.05,.06,.07, .1, .15, .2, .25, .3, .35, .4, .45, .5, .55, .6, .65, .7, .75, .8, .85, .9, .92,.93,.94,.95,.96, .97, .98, .99, 1];
export sub run() {
emit -limit 100000 -every :.01s:
| put x=normal(10,1), y=normal(11, 2)
| (@scatterchart -controlField 'x' -valueField 'y' -title 'X vs Y'
-display.duration :3s: -display.markerOpacity 0.5; merge)
| reduce -acc true -every :3s: px = percentile(x, PCT), py = percentile(y, PCT)
| put dpx = xdp(px, PCT), dpy = xdp(py, PCT)
| split dpx, dpy
| put x=value[0], dp=value[1]
| @scatterchart -controlField 'x' -valueField 'dp' -title 'density' -keyField 'name'
-display.duration :3s:
}
run
// last_last: remember the last value from the previous reducer batch/interval.
//
// Note: this must be run as a windowed reducer, eg, reduce -every :h: -over :h:
//
// (running this as a windowed reducer means its reset() function will be called
// at the beginning of each new interval rather than the reducer being torn down and
// rebuilt. This allows the reducer to propagate values across intervals.)
//
export reducer last_last(in, initial) {
var this_last = initial;
var prior_last = initial;
function update() {
this_last = *in;
}
function result() {
return prior_last;
}
function reset() {
prior_last = this_last;
}
}
export sub run() {
emit -from :last year: -limit 100 -every :s:
| put N = count()
| (
//
// estimate rate of change of N for each 10-second
// interval (it is 1/second). the input is a monotonic
// counter, and using the difference of the first and last
// points works fine.
//
reduce -every :10s: rate = (last(N) - first(N)) / Duration.seconds(last(time) - first(time))
| @logger -title "estimated rate/sec from boundary points";
//
// compute change over each interval. Difference of first
// and last isn't quite right because it does not see the
// change between the last point of the previous interval
// and the first point of the current interval (the rate
// estimator above also ignores that gap)
//
reduce -every :10s: change = last(N) - first(N)
| @logger -title "incremental change using last-first";
//
// compute change since last interval using the last_last
// custom reducer, to start from the prior interval's last point.
//
reduce -every :10s: -over :10s: change = last(N) - last_last(N, 0)
| @logger -title "incremental change using last_last";
//
// this can also be done without using the custom reducer:
// first reduce each interval to a single point, then run
// a double-wide windowed first() and last() to pick up
// the point from the previous interval. The first change
// is 0 because there is no earlier value for comparison.
// (last_last() allowed an initial count to be specified).
//
reduce -every :10s: last_N = last(N)
| reduce -every :10s: -over :20s: change = last(last_N) - first(last_N)
| @logger -title "incremental change using a double-window";
)
}
run
// implement the percentile proc using the percentile reducer
sub my_percentile(field, p) {
( reduce q = percentile(field, p);
merge;
)| join
| put below = *field <= q
| filter below == true
| sort field -desc
| head 1
| remove below, q
}
const p = 0.682;
emit -limit 100 -from 0
| put x = Math.random()
| ( percentile -p p x | put proc="percentile" | remove time;
my_percentile -field 'x' -p 0.682 | put proc = "my_percentile";
)
| @logger
reducer normal(mean, sigma) {
// return a Box-Muller approximate normal with given mean and stddev.
// this is written as a custom "reducer" because Box-Muller values come
// in pairs, and we want to save one for next time.
//
var leftover = null;
function update() {}
function result() {
if (leftover != null) {
var result = mean + sigma * leftover;
leftover = null;
return result;
} else {
var u = 2 * Math.random() - 1;
var v = 2 * Math.random() - 1;
var r = u*u + v*v;
if (r == 0 || r > 1) {
// out of bounds, try again
return result();
}
var c = Math.sqrt(-2*Math.log(r)/r);
leftover = u * c;
return mean + sigma * v * c;
}
}
}
reducer zip(a, b) {
var A = [], B = [];
function update() {
A = *a; B = *b;
}
function zip_tail(A, B, C, n) {
if (A[n] == null) {
return C;
}
C[n] = [A[n], B[n]];
return zip_tail(A, B, C, n+1);
}
function result() {
return zip_tail(A, B, [], 0);
}
}
export const PCT = [0, .01, .05, .1, .2, .3, .4, .5, .6, .7, .8, .9, .95, .99, 1];
export sub run() {
emit -limit 10000 -every :.005s:
| put x = normal(0,1), y = x + normal(0,1)
| (@scatterchart -controlField 'x' -valueField 'y' -title 'X vs Y'
-display.duration :3s: -display.markerOpacity 0.5; merge)
| reduce -acc true -every :s: px = percentile(x, PCT), py = percentile(y, PCT)
| put pxy = zip(px, py)
| split pxy
| put x=value[0], y=value[1]
| @scatterchart -controlField 'x' -valueField 'y' -title 'Q-Q plot, X vs Y'
-display.duration :3s:
}
run
// simple outer-join example:
// create a user
sub users() {
// "tables" need historic timestamps. Start this one at the epoch
emit -start 0 -limit 4
| put id=count(), username = [null, 'sam', 'bob', 'tim'][id]
| reduce username = last(username) by id;
}
sub data() {
// a stream of messages with nonzero id's
emit -limit 4 | put msg='hi', id=count()
}
(
users;
data
) | join -outer 2 id
| put msg = msg + ' ' + ((username != null) ? username : 'anonymous')
// streaming outer join demo:
// process a mixed stream of customer info events and purchase events.
// join customer info onto purchases as they occur.
//
//
// fetch demo events from a GIST.
//
sub get_events() {
// (for real data, instead of this source command you would write filtered read commands below)
source "https://gist.githubusercontent.com/welch/85872ad486a56eb9556a/raw/984b1504f7036ff548d59b2a7cb500109779869f/events.json";
}
//
// view the raw data
//
get_events | @table -title "Raw Event Stream";
//
// split the event stream into a customer info stream and a purchases stream
//
(
//
// #1. customer info stream.
//
get_events | filter event in ['create','update']
// build a "table" of customer info by remembering most recent item for each.
// update this table every second.
| reduce -acc true -every :s: cust_id = last(cust_id), email = last(email) by cust_id;
//
// #2. purchase order stream.
//
get_events | filter event = "purchase"
)
| join -outer 2 cust_id // outer join of input #2 on cust_id produces one result for each purchase
| @table -title "Purchases" -columnOrder 'time','purchase_id','cust_id','email'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment