Skip to content

Instantly share code, notes, and snippets.

@mlimotte
Created September 19, 2011 15:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save mlimotte/1226738 to your computer and use it in GitHub Desktop.
Save mlimotte/1226738 to your computer and use it in GitHub Desktop.
Counters in Cascalog
;;; The majority of this code is copied from the Cascalog source (1.7.0-SNAPSHOT as of 9/17/2011).
;;; It is fragile, because it goes beyond the public API... even using a private
;;; Cascalog function in one case. It can easily break for new versions of Cascalog.
;;;
;;; Provides a deffilterfpop macro. This def op can be used to create a Cascalog operation
;;; which behaves like a Filter, but is also passed an instance of the Cascading
;;; FlowProcess object. FlowProcess can be used to set the Hadoop Status, and to increment
;;; Hadoop Counters.
(ns weatherbill.hadoop.cascading
(:require
[cascalog
[workflow :as w]
[debug :as debug]
[predicate :as p]]
(:import
[cascading.pipe Each]
com.weatherbill.hadoop.ClojureFilterFP))
(defmethod cascalog.predicate/hof-predicate? :filterfp [op & args] (:hof? (meta op)))
(defmethod cascalog.predicate/predicate-default-var :filterfp [& args] :<)
(def hof-prepend (ns-resolve 'cascalog.predicate 'hof-prepend))
(defmethod cascalog.predicate/build-predicate-specific :filterfp [op _ hof-args infields outfields options]
(let [[func-fields out-selector]
(if (not-empty outfields) [outfields Fields/ALL] [nil nil])
assembly (apply op (hof-prepend hof-args infields :fn> func-fields :> out-selector))]
(p/predicate cascalog.predicate/operation assembly infields outfields false)))
(defn inc-counter
"Increment a Hadoop counter"
[fp group name amount]
(.increment fp group name amount))
(def defop-helper (ns-resolve 'cascalog.workflow 'defop-helper))
;TODO only (deffilterfpop filter-ok ...) is supported, not (deffilterop-fp [filter-ok [arg]] ...)
(defmacro deffilterfpop [& args]
(defop-helper 'weatherbill.hadoop.cascading/filterfp args))
(defn filterfp [& args]
(fn [previous]
(debug/debug-print "filter" args)
(let [[in-fields func-fields spec out-fields stateful] (w/parse-args args)]
(if func-fields
(Each. previous in-fields
(throw (UnsupportedOperationException. "filterfp does not support output args")))
(Each. previous in-fields
(ClojureFilterFP. spec stateful))))))
/**
* The majority of this class is copied form the Cascalog source (1.7.0-SNAPSHOT as of 9/17/2011).
* This is a filter operation, where the FlowProcess object is exposed
*/
package com.weatherbill.hadoop;
import cascading.operation.Filter;
import cascading.operation.FilterCall;
import cascading.flow.FlowProcess;
import cascalog.ClojureCascadingBase;
import clojure.lang.ISeq;
import cascalog.Util;
public class ClojureFilterFP extends ClojureCascadingBase implements Filter {
public ClojureFilterFP(Object[] fn_spec, boolean stateful) {
super(fn_spec, stateful);
}
public boolean isRemove(FlowProcess flow_process, FilterCall filter_call) {
ISeq fn_args_seq = Util.coerceFromTuple(filter_call.getArguments().getTuple());
ISeq new_fn_args_seq = fn_args_seq.cons(flow_process);
return !Util.truthy(applyFunction(new_fn_args_seq));
}
}
;;; Here is a sample query, which use deffilterfpop to update two counters.
;;; For each source row, runs some QA function on it and increments either
;;; the "data/good" counter or "data/bad" counter.
(ns sample.query
(:require weatherbill.hadoop.cascading))
...
(cs/deffilterfpop filter-ok
[fp status]
(if
(= "OK" status)
(do
(cs/inc-counter fp "data" "ok" 1)
true)
(do
(cs/inc-counter fp "data" "bad" 1)
false)))
(defn query
[input]
(?<-
histories [?source ?interval ?meas ?history]
(input ?source ?val)
(some-op ?val :> ?status)
(filter-ok ?status)))
@mlimotte
Copy link
Author

A brittle solution to enable hadoop counters from cascalog queries.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment