-
-
Save anonymous/451fc6a0c41f946dbdbf to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
2 2 1 0 foo 4 0 2012 2013 5 1 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
1 1 -1 1 1 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cascading.flow.FlowProcess; | |
import cascading.operation.Aggregator; | |
import cascading.operation.AggregatorCall; | |
import cascading.operation.BaseOperation; | |
import cascading.tuple.Fields; | |
import cascading.tuple.Tuple; | |
import cascading.tuple.TupleEntry; | |
import java.util.Random; | |
public class PayloadParser extends BaseOperation<PayloadParser.Context> implements Aggregator<PayloadParser.Context> { | |
public static Fields fields = new Fields("tmp1", "tmp2"); | |
public static class Context { | |
public Integer tmp; | |
public Integer tmp2; | |
} | |
public PayloadParser() { | |
super(fields); | |
} | |
@Override | |
public void start(FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall) { | |
Context c = new Context(); | |
c.tmp = 0; | |
c.tmp2 = 0; | |
aggregatorCall.setContext(c); | |
} | |
@Override | |
public void aggregate(FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall) { | |
Context c = aggregatorCall.getContext(); | |
c.tmp = c.tmp + getInt(); | |
c.tmp2 = c.tmp2 + getInt(); | |
} | |
private int getInt() { | |
return new Random().nextInt(); | |
} | |
@Override | |
public void complete(FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall) { | |
TupleEntry te = new TupleEntry(fields); | |
Tuple t = new Tuple(); | |
t.add(aggregatorCall.getContext().tmp); | |
t.add(aggregatorCall.getContext().tmp2); | |
te.setTuple(t); | |
aggregatorCall.getOutputCollector().add(te); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cascading.PlatformTestCase; | |
import cascading.flow.Flow; | |
import cascading.operation.aggregator.Count; | |
import cascading.pipe.CoGroup; | |
import cascading.pipe.Every; | |
import cascading.pipe.GroupBy; | |
import cascading.pipe.Pipe; | |
import cascading.pipe.joiner.InnerJoin; | |
import cascading.tap.SinkMode; | |
import cascading.tap.Tap; | |
import cascading.test.LocalPlatform; | |
import cascading.test.PlatformRunner; | |
import cascading.tuple.Fields; | |
import org.junit.Test; | |
import java.util.HashMap; | |
import java.util.Map; | |
@PlatformRunner.Platform(LocalPlatform.class) | |
public class ExtractLinksTest extends PlatformTestCase { | |
@Test | |
public void testCoGroup() { | |
Map sources = new HashMap(); | |
sources.put("pages", getPlatform().getDelimitedFile(new Fields("a", "b", "c", "d", "e"), "\n", "pages.txt")); | |
sources.put("modules", getPlatform().getDelimitedFile(new Fields("b1", "b2", "b3", "b4", "b5", "b6", "b7", "b8", "b9", "b10", "b11"), "\n", "modules.txt")); | |
Tap sink = getPlatform().getTextFile(new Fields("line"), getOutputPath("cross"), SinkMode.REPLACE); | |
Pipe pages = new Pipe("pages"); | |
Pipe modules = new Pipe("modules"); | |
modules = new GroupBy(modules, new Fields("b1")); | |
modules = new Every(modules, new PayloadParser()); | |
pages = new GroupBy(pages, new Fields("a")); | |
pages = new Every(pages, new Count()); | |
Pipe cross = new CoGroup(modules, new Fields("b1"), pages, new Fields("a"), new InnerJoin()); | |
Flow flow = getPlatform().getFlowConnector().connect(sources, sink, cross); | |
flow.complete(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment