Skip to content

Instantly share code, notes, and snippets.

@friso
Created January 29, 2012 13:45
Show Gist options
  • Save friso/1698892 to your computer and use it in GitHub Desktop.
Save friso/1698892 to your computer and use it in GitHub Desktop.
Step 3+4: a single iteration
public class IterateJob {
public static int run(String input, String output, int maxIterations) {
boolean done = false;
int iterationCount = 0;
while (!done) {
Scheme sourceScheme = new TextDelimited(new Fields("partition", "source", "list"), "\t");
Scheme sinkScheme = new TextDelimited(new Fields("partition", "source", "list"), "\t");
//SNIPPED SOME BOILERPLATE...
Pipe iteration = new Pipe("iteration");
//For each input record, create a record per node (step 3)
iteration = new Each(iteration, new FanOut());
//GROUP BY node ORDER BY partition DESCENDING
iteration = new GroupBy(
iteration,
new Fields("node"),
new Fields("partition"), true);
//For every group, create records with the largest partition
iteration = new Every(
iteration,
new MaxPartitionToTuples(),
Fields.RESULTS);
//GROUP BY source node ORDER BY partition DESCENDING (step 4)
iteration = new GroupBy(
iteration,
new Fields("source"),
new Fields("partition"),
true);
//For every group, re-create the adjacency list
//with the largest partition
//This step also updates the counter
iteration = new Every(
iteration,
new MaxPartitionToAdjacencyList(),
Fields.RESULTS);
//SNIPPED SOME BOILERPLATE...
//Grab the counter value from the flow
long updatedPartitions = flow.getFlowStats().getCounterValue(
MaxPartitionToAdjacencyList.COUNTER_GROUP,
MaxPartitionToAdjacencyList.PARTITIONS_UPDATES_COUNTER_NAME);
//Are we done?
done = updatedPartitions == 0 || iterationCount == maxIterations - 1;
iterationCount++;
}
return 0;
}
//Context object for keeping state in the aggregator
private static class MaxPartitionToAdjacencyListContext {
int source;
int partition = -1;
List<Integer> targets;
public MaxPartitionToAdjacencyListContext() {
this.targets = new ArrayList<Integer>();
}
}
@SuppressWarnings("serial")
private static class MaxPartitionToAdjacencyList
extends BaseOperation<MaxPartitionToAdjacencyListContext>
implements Aggregator<MaxPartitionToAdjacencyListContext> {
//Constants used for counter names
public static final String PARTITIONS_UPDATES_COUNTER_NAME = "Partitions updates";
public static final String COUNTER_GROUP = "graphs";
public MaxPartitionToAdjacencyList() {
super(new Fields("partition", "source", "list"));
}
@Override
public void start(
FlowProcess flowProcess,
AggregatorCall<MaxPartitionToAdjacencyListContext> aggregatorCall) {
//Initialize context
MaxPartitionToAdjacencyListContext context = new MaxPartitionToAdjacencyListContext();
context.source = aggregatorCall.getGroup().getInteger("source");
aggregatorCall.setContext(context);
}
@Override
public void aggregate(
FlowProcess flowProcess,
AggregatorCall<MaxPartitionToAdjacencyListContext> aggregatorCall) {
//Re-create the gaph and update counter on changes
MaxPartitionToAdjacencyListContext context = aggregatorCall.getContext();
TupleEntry arguments = aggregatorCall.getArguments();
int partition = arguments.getInteger("partition");
if (context.partition == -1) {
context.partition = partition;
} else if (context.partition > partition) {
flowProcess.increment(COUNTER_GROUP, PARTITIONS_UPDATES_COUNTER_NAME, 1);
}
//Ignore the source node when creating the list
int node = arguments.getInteger("node");
if (node != context.source) {
context.targets.add(node);
}
}
@Override
public void complete(
FlowProcess flowProcess,
AggregatorCall<MaxPartitionToAdjacencyListContext> aggregatorCall) {
//Output the list
MaxPartitionToAdjacencyListContext context = aggregatorCall.getContext();
Tuple result = new Tuple(
context.partition,
context.source,
StringUtils.joinObjects(",", context.targets));
aggregatorCall.getOutputCollector().add(result);
}
}
@SuppressWarnings({ "serial", "rawtypes", "unchecked" })
private static class MaxPartitionToTuples
extends BaseOperation
implements Buffer {
public MaxPartitionToTuples() {
super(new Fields("partition", "node", "source"));
}
@Override
public void operate(FlowProcess flowProcess, BufferCall bufferCall) {
Iterator<TupleEntry> itr = bufferCall.getArgumentsIterator();
int maxPartition;
TupleEntry entry = itr.next();
maxPartition = entry.getInteger("partition");
emitTuple(bufferCall, maxPartition, entry);
while (itr.hasNext()) {
entry = itr.next();
emitTuple(bufferCall, maxPartition, entry);
}
}
private void emitTuple(BufferCall bufferCall, int maxPartition, TupleEntry entry) {
Tuple result = new Tuple(
maxPartition,
entry.getInteger("node"),
entry.getInteger("source"));
bufferCall.getOutputCollector().add(result);
}
}
@SuppressWarnings({ "serial", "rawtypes" })
private static class FanOut extends BaseOperation implements Function {
public FanOut() {
super(3, new Fields("partition", "node", "source"));
}
@Override
public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
TupleEntry args = functionCall.getArguments();
int partition = args.getInteger("partition");
int source = args.getInteger("source");
Tuple result = new Tuple(partition, source, source);
functionCall.getOutputCollector().add(result);
for (String node : args.getString("list").split(",")) {
result = new Tuple(partition, Integer.parseInt(node), source);
functionCall.getOutputCollector().add(result);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment