Skip to content

Instantly share code, notes, and snippets.

@balidani
Last active August 29, 2015 14:16
Show Gist options
  • Save balidani/d9789b713e559d867d5c to your computer and use it in GitHub Desktop.
Save balidani/d9789b713e559d867d5c to your computer and use it in GitHub Desktop.
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class JoinBugTest {
public JoinBugTest() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
DataSet<Tuple2<Long, Long>> edges = env.generateSequence(0, 100)
.flatMap(new FlatMapFunction<Long, Tuple2<Long, Long>>() {
@Override
public void flatMap(Long value, Collector<Tuple2<Long, Long>> collector) throws Exception {
collector.collect(new Tuple2<Long, Long>(value, value + 1));
}
}).distinct();
edges.join(edges
.groupBy(0)
.reduceGroup(new EdgeCandidateReducer()))
.where(0, 1)
.equalTo(new EdgeKeySelector())
.map(new EdgeCandidateCountMapper())
.sum(0)
.print();
env.execute("Join test");
}
private static final class EdgeCandidateReducer
implements GroupReduceFunction<Tuple2<Long, Long>,Tuple2<Long, Long>> {
@Override
public void reduce(Iterable<Tuple2<Long, Long>> iterable,
Collector<Tuple2<Long, Long>> out) throws Exception {
for (Tuple2<Long, Long> edge : iterable) {
out.collect(new Tuple2<Long, Long>(edge.f0, edge.f1));
}
}
}
private static final class EdgeCandidateCountMapper
implements MapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple1<Long>> {
@Override
public Tuple1<Long> map(Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> input) throws Exception {
return new Tuple1<Long>(1L);
}
}
private static final class EdgeKeySelector implements KeySelector<Tuple2<Long, Long>, Tuple2<Long, Long>> {
@Override
public Tuple2<Long, Long> getKey(Tuple2<Long, Long> edge) throws Exception {
return new Tuple2<Long, Long>(edge.f0, edge.f1);
}
}
public static void main(String[] args) throws Exception {
new JoinBugTest();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment