Skip to content

Instantly share code, notes, and snippets.

@TheNilesh
Created June 19, 2020 06:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save TheNilesh/81fdba0db00861ba3eeca0c4589fb10f to your computer and use it in GitHub Desktop.
Save TheNilesh/81fdba0db00861ba3eeca0c4589fb10f to your computer and use it in GitHub Desktop.
Apache Flink Example of using CoGroupFunction on shoe stream
package com.quickheal.correlation;
import java.util.Date;
import java.util.Properties;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
public class CoGroupExampleKafkaConsumer {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<String> shoeStream = env.addSource(new FlinkKafkaConsumer010<>("test",
new SimpleStringSchema(), createConsumerProperties()));
DataStream<Tuple2<String, String>> tupleShoeStream = shoeStream
.map(new MapFunction<String, Tuple2<String, String>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, String> map(String value) throws Exception {
String[] items = value.split(",");
String color = items[0];
String leg = items[1];
return Tuple2.of(color, leg);
}
})
.assignTimestampsAndWatermarks(new TimestampExtractor(Time.seconds(1)));
final OutputTag<String> outputTag =
new OutputTag<String>("side-output"){
private static final long serialVersionUID = -1L;
};
SingleOutputStreamOperator<String> leftShoeStream = tupleShoeStream
.process(new ProcessFunction<Tuple2<String, String>, String>() {
private static final long serialVersionUID = 1L;
@Override
public void processElement(
Tuple2<String, String> value,
Context ctx,
Collector<String> out) throws Exception {
if (value.f1.startsWith("L") || value.f1.startsWith("l")) {
out.collect(value.f0);
} else {
ctx.output(outputTag, value.f0);
}
}
});
DataStream<String> rightShoeStream = leftShoeStream.getSideOutput(outputTag);
DataStream<String> cogroupResult = leftShoeStream.coGroup(rightShoeStream)
.where(new FirstFieldSelector()) //f0
.equalTo(new FirstFieldSelector())
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new CoGroupFunction<String, String, String>() {
private static final long serialVersionUID = 1L;
@Override
public void coGroup(Iterable<String> leftShoes, Iterable<String> rightShoes,
Collector<String> out) throws Exception {
System.out.println(System.currentTimeMillis());
for (String shoeColor : leftShoes) {
System.out.println(shoeColor + ",left");
}
for (String shoeColor : rightShoes) {
System.out.println(shoeColor + ",right");
}
System.out.println("------------------");
}
});
cogroupResult.print();
env.execute();
}
protected static Properties createConsumerProperties() {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "connection");
return properties;
}
public static class FirstFieldSelector implements KeySelector<String, String>{
private static final long serialVersionUID = 1L;
@Override
public String getKey(String value) throws Exception {
return value;
}
}
public static class TimestampExtractor extends BoundedOutOfOrdernessTimestampExtractor<Tuple2<String, String>> {
private static final long serialVersionUID = 1L;
public TimestampExtractor(Time maxOutOfOrderness) {
super(maxOutOfOrderness);
}
@Override
public long extractTimestamp(Tuple2<String, String> event) {
return new Date().getTime();
}
}
}
@TheNilesh
Copy link
Author

Output:

C:\kafka>bin\windows\kafka-console-producer.bat --bootstrap-server localhost:9092 --topic test
>blue,l
>blue,l
>blue,r

C:>java CoGroupShoes.java
1592547269989
blue,left
------------------
1592547275606
blue,left
------------------

Where is the last shoe?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment