Skip to content

Instantly share code, notes, and snippets.

@sosuren
Created April 7, 2014 12:04
Show Gist options
  • Save sosuren/10019055 to your computer and use it in GitHub Desktop.
Save sosuren/10019055 to your computer and use it in GitHub Desktop.
Aggregator to populate multiple tuples
package com.deerwalk.das.scrub;
import cascading.flow.FlowProcess;
import cascading.operation.Aggregator;
import cascading.operation.AggregatorCall;
import cascading.operation.BaseOperation;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
/**
* Created by slama on 4/7/14.
*/
public class MyAggregator extends BaseOperation implements Aggregator {
public MyAggregator(Fields lookupFields){
super(1, lookupFields);
this.lookupFields = lookupFields;
}
@Override
public void start(FlowProcess flowProcess, AggregatorCall aggregatorCall) {
aggregatorCall.setContext(new Context());
}
@Override
public void aggregate(FlowProcess flowProcess, AggregatorCall aggregatorCall) {
Context c = (Context)aggregatorCall.getContext();
TupleEntry te = new TupleEntry(aggregatorCall.getArguments());
c.inputFields = te.getFields();
String tempStr = (te.getString(lookupFields) == null) ? "" : te.getString(lookupFields);
if(c.isStart){
c.isStart = false;
c.minStr = tempStr;
}
if( tempStr.compareTo(c.minStr) < 0 ){
c.minStr = tempStr;
}
}
@Override
public void complete(FlowProcess flowProcess, AggregatorCall aggregatorCall) {
Context c = ((Context)aggregatorCall.getContext());
System.out.println("c.minStr: " + c.minStr);
Tuple result = new Tuple();
result.addString(c.minStr);
aggregatorCall.getOutputCollector().add(result);
}
private Fields lookupFields = null;
static class Context{
boolean isStart = true;
String minStr = null;
Fields inputFields;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment