Skip to content

Instantly share code, notes, and snippets.

@danielmewes
Created August 23, 2016 16:48
Show Gist options
  • Save danielmewes/1059b3b84a4db2f8f537285daa79779c to your computer and use it in GitHub Desktop.
Save danielmewes/1059b3b84a4db2f8f537285daa79779c to your computer and use it in GitHub Desktop.
Aggregation changefeed `fold` rewrite

Given a changefeed of the form:

stream.reduce(f).changes({includeInitial: <II>, includeStates: <IS>})

Assume that for the given f, we know the following properties:

  • <f_BASE> the initial accumulator for f
  • <f_APPLY> a function from the accumulator and an element in the input table to a new accumulator
  • <f_UNAPPLY> the inverse of <f_APPLY> in the accumulator
  • <f_EMIT> generates a result value of the reduction from the current accumulator

Now the query can be rewritten into:

stream.changes({includeInitial: true, includeStates: true}).fold(
  {f_acc: <f_BASE>, is_initialized: false},
  function(acc, el) {
    var f_acc = acc('f_acc');
    var new_f_acc = r.branch(el.hasFields("old_val"), <f_UNAPPLY>(f_acc, el('old_val')), f_acc).do(function(un_f_acc) {
        return r.branch(el.hasFields("new_val"), <f_APPLY>(un_f_acc, el('new_val')), un_f_acc);
      });
    var new_is_initialized = acc('is_initialized').or(el.hasFields('state').and(el('state').eq('ready')));
    return {f_acc: new_f_acc, is_initialized: new_is_initialized};
  },
  {emit: function(old_acc, el, new_acc) {
    var old_f_acc = old_acc('f_acc');
    var new_f_acc = new_acc('f_acc');
    var old_val = f_EMIT(old_f_acc);
    var new_val = f_EMIT(new_f_acc);
    // We handle the 'ready' state separately below
    var emit_state = r.expr(IS).and(el.hasFields('state')).and(r.expr(II).not().or(el('state').ne('ready')));
    var emit_update = old_acc('is_initialized').and(old_val.ne(new_val));
    var emit_initial = r.expr(<II>).and(old_acc('is_initialized').not().and(new_acc('is_initialized')));
    return r.branch(
      emit_state, [el],
      emit_update, [{'old_val': old_val, 'new_val': new_val}],
      emit_initial, r.branch(<IS>, [{'new_val': new_val}, {state: "ready"}], [{'new_val': new_val}]),
      []
    );
  }})

For example for count():

  • <f_BASE> = 0
  • <f_APPLY> = function(acc, el) { return acc.add(1); }
  • <f_UNAPPLY> = function(acc, el) { return acc.sub(1); }
  • <f_EMIT> = function(acc) { return acc; }

Or for avg():

  • <f_BASE> = {c: 0, sum: 0}
  • <f_APPLY> = function(acc, el) { return {c: acc('c').add(1), sum: acc('sum').add(el) }; }
  • <f_UNAPPLY> = function(acc, el) { return {c: acc('c').sub(1), sum: acc('sum').sub(el) }; }
  • <f_EMIT> = function(acc) { return acc('sum').div(acc('c')); } (plus some sort of handling for empty input sets that we need to come up with)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment