Skip to content

Instantly share code, notes, and snippets.

Created June 26, 2012 08:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anonymous/451fc6a0c41f946dbdbf to your computer and use it in GitHub Desktop.
Save anonymous/451fc6a0c41f946dbdbf to your computer and use it in GitHub Desktop.
2 2 1 0 foo 4 0 2012 2013 5 1
import cascading.flow.FlowProcess;
import cascading.operation.Aggregator;
import cascading.operation.AggregatorCall;
import cascading.operation.BaseOperation;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import java.util.Random;
public class PayloadParser extends BaseOperation<PayloadParser.Context> implements Aggregator<PayloadParser.Context> {
public static Fields fields = new Fields("tmp1", "tmp2");
public static class Context {
public Integer tmp;
public Integer tmp2;
}
public PayloadParser() {
super(fields);
}
@Override
public void start(FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall) {
Context c = new Context();
c.tmp = 0;
c.tmp2 = 0;
aggregatorCall.setContext(c);
}
@Override
public void aggregate(FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall) {
Context c = aggregatorCall.getContext();
c.tmp = c.tmp + getInt();
c.tmp2 = c.tmp2 + getInt();
}
private int getInt() {
return new Random().nextInt();
}
@Override
public void complete(FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall) {
TupleEntry te = new TupleEntry(fields);
Tuple t = new Tuple();
t.add(aggregatorCall.getContext().tmp);
t.add(aggregatorCall.getContext().tmp2);
te.setTuple(t);
aggregatorCall.getOutputCollector().add(te);
}
}
import cascading.PlatformTestCase;
import cascading.flow.Flow;
import cascading.operation.aggregator.Count;
import cascading.pipe.CoGroup;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.pipe.joiner.InnerJoin;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.test.LocalPlatform;
import cascading.test.PlatformRunner;
import cascading.tuple.Fields;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
@PlatformRunner.Platform(LocalPlatform.class)
public class ExtractLinksTest extends PlatformTestCase {
@Test
public void testCoGroup() {
Map sources = new HashMap();
sources.put("pages", getPlatform().getDelimitedFile(new Fields("a", "b", "c", "d", "e"), "\n", "pages.txt"));
sources.put("modules", getPlatform().getDelimitedFile(new Fields("b1", "b2", "b3", "b4", "b5", "b6", "b7", "b8", "b9", "b10", "b11"), "\n", "modules.txt"));
Tap sink = getPlatform().getTextFile(new Fields("line"), getOutputPath("cross"), SinkMode.REPLACE);
Pipe pages = new Pipe("pages");
Pipe modules = new Pipe("modules");
modules = new GroupBy(modules, new Fields("b1"));
modules = new Every(modules, new PayloadParser());
pages = new GroupBy(pages, new Fields("a"));
pages = new Every(pages, new Count());
Pipe cross = new CoGroup(modules, new Fields("b1"), pages, new Fields("a"), new InnerJoin());
Flow flow = getPlatform().getFlowConnector().connect(sources, sink, cross);
flow.complete();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment