Skip to content

Instantly share code, notes, and snippets.

@chrisa
Created February 20, 2010 19:01
Show Gist options
  • Select an option

  • Save chrisa/309838 to your computer and use it in GitHub Desktop.

Select an option

Save chrisa/309838 to your computer and use it in GitHub Desktop.
public class MovingAverage {
public static void main(String[] args) throws Exception {
Fields keyFields = new Fields( "date" );
String[] familyNames = {"basic"};
Fields[] valueFields = new Fields[]{new Fields("duration", "datemillis")};
Tap source = new HBaseTap("candrewsdev", new HBaseScheme( keyFields, familyNames, valueFields ) );
Tap sink = new Lfs(new Fields("daterange", "average"), "/tmp/average.out", SinkMode.REPLACE);
Function fn = new FirehoseFn();
Pipe parse = new Each("moving-average", new Fields("datemillis", "duration"), fn);
Pipe assembly = new GroupBy(parse, new Fields("daterange"));
Aggregator average = new Average(new Fields("average"));
assembly = new Every(assembly, new Fields("duration"), average);
Filter debug = new Debug(true);
assembly = new Each(assembly, debug);
Properties properties = new Properties();
FlowConnector.setApplicationJarClass( properties, MovingAverage.class );
FlowConnector flowConnector = new FlowConnector();
Flow flow = flowConnector.connect( "average", source, sink, assembly );
flow.complete();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment