-
-
Save balidani/d9789b713e559d867d5c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.flink.api.common.functions.FlatMapFunction; | |
import org.apache.flink.api.common.functions.GroupReduceFunction; | |
import org.apache.flink.api.common.functions.MapFunction; | |
import org.apache.flink.api.java.DataSet; | |
import org.apache.flink.api.java.ExecutionEnvironment; | |
import org.apache.flink.api.java.functions.KeySelector; | |
import org.apache.flink.api.java.tuple.Tuple1; | |
import org.apache.flink.api.java.tuple.Tuple2; | |
import org.apache.flink.util.Collector; | |
public class JoinBugTest { | |
public JoinBugTest() throws Exception { | |
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); | |
DataSet<Tuple2<Long, Long>> edges = env.generateSequence(0, 100) | |
.flatMap(new FlatMapFunction<Long, Tuple2<Long, Long>>() { | |
@Override | |
public void flatMap(Long value, Collector<Tuple2<Long, Long>> collector) throws Exception { | |
collector.collect(new Tuple2<Long, Long>(value, value + 1)); | |
} | |
}).distinct(); | |
edges.join(edges | |
.groupBy(0) | |
.reduceGroup(new EdgeCandidateReducer())) | |
.where(0, 1) | |
.equalTo(new EdgeKeySelector()) | |
.map(new EdgeCandidateCountMapper()) | |
.sum(0) | |
.print(); | |
env.execute("Join test"); | |
} | |
private static final class EdgeCandidateReducer | |
implements GroupReduceFunction<Tuple2<Long, Long>,Tuple2<Long, Long>> { | |
@Override | |
public void reduce(Iterable<Tuple2<Long, Long>> iterable, | |
Collector<Tuple2<Long, Long>> out) throws Exception { | |
for (Tuple2<Long, Long> edge : iterable) { | |
out.collect(new Tuple2<Long, Long>(edge.f0, edge.f1)); | |
} | |
} | |
} | |
private static final class EdgeCandidateCountMapper | |
implements MapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple1<Long>> { | |
@Override | |
public Tuple1<Long> map(Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> input) throws Exception { | |
return new Tuple1<Long>(1L); | |
} | |
} | |
private static final class EdgeKeySelector implements KeySelector<Tuple2<Long, Long>, Tuple2<Long, Long>> { | |
@Override | |
public Tuple2<Long, Long> getKey(Tuple2<Long, Long> edge) throws Exception { | |
return new Tuple2<Long, Long>(edge.f0, edge.f1); | |
} | |
} | |
public static void main(String[] args) throws Exception { | |
new JoinBugTest(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment