Skip to content

Instantly share code, notes, and snippets.

@mxm
Last active June 25, 2020 09:10
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save mxm/c2e9c459a9d82c18d789 to your computer and use it in GitHub Desktop.
Save mxm/c2e9c459a9d82c18d789 to your computer and use it in GitHub Desktop.
Example for a LEFT OUTER JOIN in Apache Flink
package org.myorg.quickstart;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class LeftOuterJoinExample {
public static class LeftOuterJoin implements CoGroupFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, Integer>> {
@Override
public void coGroup(Iterable<Tuple2<Integer, String>> leftElements,
Iterable<Tuple2<Integer, String>> rightElements,
Collector<Tuple2<Integer, Integer>> out) throws Exception {
final int NULL_ELEMENT = -1;
for (Tuple2<Integer, String> leftElem : leftElements) {
boolean hadElements = false;
for (Tuple2<Integer, String> rightElem : rightElements) {
out.collect(new Tuple2<Integer, Integer>(leftElem.f0, rightElem.f0));
hadElements = true;
}
if (!hadElements) {
out.collect(new Tuple2<Integer, Integer>(leftElem.f0, NULL_ELEMENT));
}
}
}
}
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<Integer> leftSide = env.fromElements(1, 2, 3, 4, 5);
DataSet<Tuple2<Integer, String>> leftSide2 = leftSide.map(
new MapFunction<Integer, Tuple2<Integer, String>>() {
@Override
public Tuple2<Integer, String> map(Integer integer) throws Exception {
return new Tuple2<Integer, String>(integer, "some data");
}
});
DataSource<Integer> rightSide = env.fromElements(4, 5, 6, 7, 8, 9, 10);
DataSet<Tuple2<Integer, String>> rightSide2 = rightSide.map(
new MapFunction<Integer, Tuple2<Integer, String>>() {
@Override
public Tuple2<Integer, String> map(Integer integer) throws Exception {
return new Tuple2<Integer, String>(integer, "some other data");
}
});
DataSet<Tuple2<Integer, Integer>> leftOuterJoin = leftSide2.coGroup(rightSide2)
.where(0)
.equalTo(0)
.with(new LeftOuterJoin());
leftOuterJoin.print();
env.execute();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment