Created
January 29, 2012 11:00
-
-
Save friso/1698283 to your computer and use it in GitHub Desktop.
Edge list to adjacency list representation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class PrepareJob { | |
public static int run(String input, String output) { | |
Scheme sourceScheme = new TextDelimited(new Fields("source", "target"), ","); | |
Tap source = new Hfs(sourceScheme, input); | |
Scheme sinkScheme = new TextDelimited(new Fields("partition", "source", "list"), "\t"); | |
Tap sink = new Hfs(sinkScheme, output, SinkMode.REPLACE); | |
Pipe prepare = new Pipe("prepare"); | |
//Parse string to int's, | |
//this way we get numerical sorting, instead of text sorting | |
prepare = new Each(prepare, new Identity(Integer.TYPE, Integer.TYPE)); | |
//GROUP BY source node ORDER BY target DESCENDING | |
prepare = new GroupBy(prepare, new Fields("source"), new Fields("target"), true); | |
//For every group, run the ToAdjacencyList aggregator | |
prepare = new Every(prepare, new ToAdjacencyList(), Fields.RESULTS); | |
Properties properties = new Properties(); | |
FlowConnector.setApplicationJarClass(properties, PrepareJob.class); | |
FlowConnector flowConnector = new FlowConnector(properties); | |
Flow flow = flowConnector.connect("originalSet", source, sink, prepare); | |
//GO! | |
flow.complete(); | |
return 0; | |
} | |
//In Cascading a aggregator needs a context object to keep state | |
private static class ToAdjacencyListContext { | |
int source; | |
int partition = -1; | |
List<Integer> targets = new ArrayList<Integer>(); | |
} | |
private static class ToAdjacencyList | |
extends BaseOperation<ToAdjacencyListContext> | |
implements Aggregator<ToAdjacencyListContext> { | |
public ToAdjacencyList() { | |
super(new Fields("partition", "source", "list")); | |
} | |
@Override | |
public void start( | |
FlowProcess flowProcess, | |
AggregatorCall<ToAdjacencyListContext> aggregatorCall) { | |
//Store source node | |
ToAdjacencyListContext context = new ToAdjacencyListContext(); | |
context.source = aggregatorCall.getGroup().getInteger("source"); | |
aggregatorCall.setContext(context); | |
} | |
@Override | |
public void aggregate( | |
FlowProcess flowProcess, | |
AggregatorCall<ToAdjacencyListContext> aggregatorCall) { | |
ToAdjacencyListContext context = aggregatorCall.getContext(); | |
TupleEntry arguments = aggregatorCall.getArguments(); | |
//Set the partition ID to max(source ID, target IDs) | |
int target = arguments.getInteger("target"); | |
if (context.partition == -1) { | |
context.partition = target > context.source ? target : context.source; | |
} | |
//Add each target to the adjacency list | |
context.targets.add(target); | |
} | |
@Override | |
public void complete( | |
FlowProcess flowProcess, | |
AggregatorCall<ToAdjacencyListContext> aggregatorCall) { | |
ToAdjacencyListContext context = aggregatorCall.getContext(); | |
//Ouput a single tuple with the partition, source node and adjacency list | |
Tuple result = new Tuple( | |
context.partition, context.source, | |
StringUtils.joinObjects(",", context.targets)); | |
aggregatorCall.getOutputCollector().add(result); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment