Created
January 29, 2012 13:45
-
-
Save friso/1698892 to your computer and use it in GitHub Desktop.
Step 3+4: a single iteration
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class IterateJob { | |
public static int run(String input, String output, int maxIterations) { | |
boolean done = false; | |
int iterationCount = 0; | |
while (!done) { | |
Scheme sourceScheme = new TextDelimited(new Fields("partition", "source", "list"), "\t"); | |
Scheme sinkScheme = new TextDelimited(new Fields("partition", "source", "list"), "\t"); | |
//SNIPPED SOME BOILERPLATE... | |
Pipe iteration = new Pipe("iteration"); | |
//For each input record, create a record per node (step 3) | |
iteration = new Each(iteration, new FanOut()); | |
//GROUP BY node ORDER BY partition DESCENDING | |
iteration = new GroupBy( | |
iteration, | |
new Fields("node"), | |
new Fields("partition"), true); | |
//For every group, create records with the largest partition | |
iteration = new Every( | |
iteration, | |
new MaxPartitionToTuples(), | |
Fields.RESULTS); | |
//GROUP BY source node ORDER BY partition DESCENDING (step 4) | |
iteration = new GroupBy( | |
iteration, | |
new Fields("source"), | |
new Fields("partition"), | |
true); | |
//For every group, re-create the adjacency list | |
//with the largest partition | |
//This step also updates the counter | |
iteration = new Every( | |
iteration, | |
new MaxPartitionToAdjacencyList(), | |
Fields.RESULTS); | |
//SNIPPED SOME BOILERPLATE... | |
//Grab the counter value from the flow | |
long updatedPartitions = flow.getFlowStats().getCounterValue( | |
MaxPartitionToAdjacencyList.COUNTER_GROUP, | |
MaxPartitionToAdjacencyList.PARTITIONS_UPDATES_COUNTER_NAME); | |
//Are we done? | |
done = updatedPartitions == 0 || iterationCount == maxIterations - 1; | |
iterationCount++; | |
} | |
return 0; | |
} | |
//Context object for keeping state in the aggregator | |
private static class MaxPartitionToAdjacencyListContext { | |
int source; | |
int partition = -1; | |
List<Integer> targets; | |
public MaxPartitionToAdjacencyListContext() { | |
this.targets = new ArrayList<Integer>(); | |
} | |
} | |
@SuppressWarnings("serial") | |
private static class MaxPartitionToAdjacencyList | |
extends BaseOperation<MaxPartitionToAdjacencyListContext> | |
implements Aggregator<MaxPartitionToAdjacencyListContext> { | |
//Constants used for counter names | |
public static final String PARTITIONS_UPDATES_COUNTER_NAME = "Partitions updates"; | |
public static final String COUNTER_GROUP = "graphs"; | |
public MaxPartitionToAdjacencyList() { | |
super(new Fields("partition", "source", "list")); | |
} | |
@Override | |
public void start( | |
FlowProcess flowProcess, | |
AggregatorCall<MaxPartitionToAdjacencyListContext> aggregatorCall) { | |
//Initialize context | |
MaxPartitionToAdjacencyListContext context = new MaxPartitionToAdjacencyListContext(); | |
context.source = aggregatorCall.getGroup().getInteger("source"); | |
aggregatorCall.setContext(context); | |
} | |
@Override | |
public void aggregate( | |
FlowProcess flowProcess, | |
AggregatorCall<MaxPartitionToAdjacencyListContext> aggregatorCall) { | |
//Re-create the gaph and update counter on changes | |
MaxPartitionToAdjacencyListContext context = aggregatorCall.getContext(); | |
TupleEntry arguments = aggregatorCall.getArguments(); | |
int partition = arguments.getInteger("partition"); | |
if (context.partition == -1) { | |
context.partition = partition; | |
} else if (context.partition > partition) { | |
flowProcess.increment(COUNTER_GROUP, PARTITIONS_UPDATES_COUNTER_NAME, 1); | |
} | |
//Ignore the source node when creating the list | |
int node = arguments.getInteger("node"); | |
if (node != context.source) { | |
context.targets.add(node); | |
} | |
} | |
@Override | |
public void complete( | |
FlowProcess flowProcess, | |
AggregatorCall<MaxPartitionToAdjacencyListContext> aggregatorCall) { | |
//Output the list | |
MaxPartitionToAdjacencyListContext context = aggregatorCall.getContext(); | |
Tuple result = new Tuple( | |
context.partition, | |
context.source, | |
StringUtils.joinObjects(",", context.targets)); | |
aggregatorCall.getOutputCollector().add(result); | |
} | |
} | |
@SuppressWarnings({ "serial", "rawtypes", "unchecked" }) | |
private static class MaxPartitionToTuples | |
extends BaseOperation | |
implements Buffer { | |
public MaxPartitionToTuples() { | |
super(new Fields("partition", "node", "source")); | |
} | |
@Override | |
public void operate(FlowProcess flowProcess, BufferCall bufferCall) { | |
Iterator<TupleEntry> itr = bufferCall.getArgumentsIterator(); | |
int maxPartition; | |
TupleEntry entry = itr.next(); | |
maxPartition = entry.getInteger("partition"); | |
emitTuple(bufferCall, maxPartition, entry); | |
while (itr.hasNext()) { | |
entry = itr.next(); | |
emitTuple(bufferCall, maxPartition, entry); | |
} | |
} | |
private void emitTuple(BufferCall bufferCall, int maxPartition, TupleEntry entry) { | |
Tuple result = new Tuple( | |
maxPartition, | |
entry.getInteger("node"), | |
entry.getInteger("source")); | |
bufferCall.getOutputCollector().add(result); | |
} | |
} | |
@SuppressWarnings({ "serial", "rawtypes" }) | |
private static class FanOut extends BaseOperation implements Function { | |
public FanOut() { | |
super(3, new Fields("partition", "node", "source")); | |
} | |
@Override | |
public void operate(FlowProcess flowProcess, FunctionCall functionCall) { | |
TupleEntry args = functionCall.getArguments(); | |
int partition = args.getInteger("partition"); | |
int source = args.getInteger("source"); | |
Tuple result = new Tuple(partition, source, source); | |
functionCall.getOutputCollector().add(result); | |
for (String node : args.getString("list").split(",")) { | |
result = new Tuple(partition, Integer.parseInt(node), source); | |
functionCall.getOutputCollector().add(result); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment