Skip to content

Instantly share code, notes, and snippets.

@fhueske
Last active August 18, 2020 07:49
Show Gist options
  • Save fhueske/e7820b47c2f8739931e9fc1a66d2268e to your computer and use it in GitHub Desktop.
Save fhueske/e7820b47c2f8739931e9fc1a66d2268e to your computer and use it in GitHub Desktop.
Simple test for checking side outputs from onTimer() method.
package com.ververica;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
public class UserTest {
public static OutTag sideOutput = new OutTag();
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
DataStream<Tuple2<Integer, String>> src1 = env.addSource(new GenSrc("first_"));
DataStream<Tuple2<Integer, String>> src2 = env.addSource(new GenSrc("second_"));
SingleOutputStreamOperator<String> connected = src1.keyBy(x -> x.f0)
.connect(src2.keyBy(x -> x.f0)).process(new MyCoFunc());
DataStream<String> sideOut = connected.getSideOutput(sideOutput);
connected.print();
sideOut.print();
env.execute();
}
public static class OutTag extends OutputTag<String> {
public OutTag() {
super("outT");
}
}
public static class MyCoFunc extends KeyedCoProcessFunction<Integer, Tuple2<Integer, String>, Tuple2<Integer, String>, String> {
@Override
public void processElement1(Tuple2<Integer, String> v, Context ctx, Collector<String> collector) {
collector.collect(v.f1);
ctx.output(sideOutput, "SideOutput" + v.f1);
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 500);
}
@Override
public void processElement2(Tuple2<Integer, String> v, Context ctx, Collector<String> collector) {
collector.collect(v.f1);
ctx.output(sideOutput, "SideOutput" + v.f1);
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 500);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) {
ctx.output(sideOutput, "TimerSideOut @ " + timestamp);
}
}
public static class GenSrc implements SourceFunction<Tuple2<Integer, String>> {
private final String val;
private long cnt;
private int keyCnt;
public GenSrc(String val) {
this.val = val;
this.cnt = 0;
this.keyCnt = 0;
}
@Override
public void run(SourceContext<Tuple2<Integer, String>> sourceContext) throws Exception {
while (true) {
int key = keyCnt++;
if (key > 4) {
key = 0;
}
sourceContext.collect(Tuple2.of(key, val + (cnt++)));
Thread.sleep(1000);
}
}
@Override
public void cancel() { }
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment