Skip to content

Instantly share code, notes, and snippets.

@friso
Created January 29, 2012 11:00
Show Gist options
  • Save friso/1698283 to your computer and use it in GitHub Desktop.
Save friso/1698283 to your computer and use it in GitHub Desktop.
Edge list to adjacency list representation
public class PrepareJob {
public static int run(String input, String output) {
Scheme sourceScheme = new TextDelimited(new Fields("source", "target"), ",");
Tap source = new Hfs(sourceScheme, input);
Scheme sinkScheme = new TextDelimited(new Fields("partition", "source", "list"), "\t");
Tap sink = new Hfs(sinkScheme, output, SinkMode.REPLACE);
Pipe prepare = new Pipe("prepare");
//Parse string to int's,
//this way we get numerical sorting, instead of text sorting
prepare = new Each(prepare, new Identity(Integer.TYPE, Integer.TYPE));
//GROUP BY source node ORDER BY target DESCENDING
prepare = new GroupBy(prepare, new Fields("source"), new Fields("target"), true);
//For every group, run the ToAdjacencyList aggregator
prepare = new Every(prepare, new ToAdjacencyList(), Fields.RESULTS);
Properties properties = new Properties();
FlowConnector.setApplicationJarClass(properties, PrepareJob.class);
FlowConnector flowConnector = new FlowConnector(properties);
Flow flow = flowConnector.connect("originalSet", source, sink, prepare);
//GO!
flow.complete();
return 0;
}
//In Cascading a aggregator needs a context object to keep state
private static class ToAdjacencyListContext {
int source;
int partition = -1;
List<Integer> targets = new ArrayList<Integer>();
}
private static class ToAdjacencyList
extends BaseOperation<ToAdjacencyListContext>
implements Aggregator<ToAdjacencyListContext> {
public ToAdjacencyList() {
super(new Fields("partition", "source", "list"));
}
@Override
public void start(
FlowProcess flowProcess,
AggregatorCall<ToAdjacencyListContext> aggregatorCall) {
//Store source node
ToAdjacencyListContext context = new ToAdjacencyListContext();
context.source = aggregatorCall.getGroup().getInteger("source");
aggregatorCall.setContext(context);
}
@Override
public void aggregate(
FlowProcess flowProcess,
AggregatorCall<ToAdjacencyListContext> aggregatorCall) {
ToAdjacencyListContext context = aggregatorCall.getContext();
TupleEntry arguments = aggregatorCall.getArguments();
//Set the partition ID to max(source ID, target IDs)
int target = arguments.getInteger("target");
if (context.partition == -1) {
context.partition = target > context.source ? target : context.source;
}
//Add each target to the adjacency list
context.targets.add(target);
}
@Override
public void complete(
FlowProcess flowProcess,
AggregatorCall<ToAdjacencyListContext> aggregatorCall) {
ToAdjacencyListContext context = aggregatorCall.getContext();
//Ouput a single tuple with the partition, source node and adjacency list
Tuple result = new Tuple(
context.partition, context.source,
StringUtils.joinObjects(",", context.targets));
aggregatorCall.getOutputCollector().add(result);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment