Skip to content

Instantly share code, notes, and snippets.

@chrisa
Created February 20, 2010 19:02
Show Gist options
  • Select an option

  • Save chrisa/309839 to your computer and use it in GitHub Desktop.

Select an option

Save chrisa/309839 to your computer and use it in GitHub Desktop.
public class FirehoseFn extends BaseOperation implements Function {
public FirehoseFn() {
super(2, new Fields("daterange", "duration"));
}
public void operate( FlowProcess flowProcess, FunctionCall functionCall ) {
TupleEntry arguments = functionCall.getArguments();
Tuple result = new Tuple();
String datemillisBytes = (String) arguments.get(0);
Long datemillis = Bytes.toLong(datemillisBytes.getBytes());
Long range = (datemillis / 5000) * 5000;
result.add(range);
String durationBytes = (String) arguments.get(1);
BigDecimal duration = Bytes.toBigDecimal(durationBytes.getBytes());
result.add(duration.doubleValue());
functionCall.getOutputCollector().add(result);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment