Skip to content

Instantly share code, notes, and snippets.

@matanper
Created December 21, 2023 13:26
Show Gist options
  • Save matanper/3e7d1c7ad6e1b9f88f11b5c4259df241 to your computer and use it in GitHub Desktop.
Save matanper/3e7d1c7ad6e1b9f88f11b5c4259df241 to your computer and use it in GitHub Desktop.
Apache Flink deduplication by session key
package org.monocle;
import java.io.IOException;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
public class DeduplicationKeySession<K, T> extends KeyedProcessFunction<K, T, T> {
private transient ValueState<Boolean> eventEmitted;
private transient ValueState<Long> timerState;
private Time sessionLength;
public DeduplicationKeySession(Time sessionLength) {
this.sessionLength = sessionLength;
}
@Override
public void open(Configuration parameters) {
this.eventEmitted = getRuntimeContext().getState(new ValueStateDescriptor<>("emitted", Boolean.class));
this.timerState = getRuntimeContext().getState(new ValueStateDescriptor<>("timerState", Long.class));
}
@Override
public void processElement(T value, KeyedProcessFunction<K, T, T>.Context ctx, Collector<T> out) throws IOException {
// Emitting only first value, then ignore the rest until state is cleared
Boolean alreadyEmitted = eventEmitted.value();
if (alreadyEmitted == null || !alreadyEmitted) {
out.collect(value);
eventEmitted.update(true);
}
// In order to act like session window, remove previous timer if needed and creates new one
Long currentTimer = timerState.value();
if (currentTimer == null || ctx.timestamp() > currentTimer) {
if (currentTimer != null) {
ctx.timerService().deleteEventTimeTimer(currentTimer);
}
long newTimerTime = ctx.timestamp() + sessionLength.toMilliseconds();
ctx.timerService().registerEventTimeTimer(newTimerTime);
timerState.update(newTimerTime);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<T> out) {
eventEmitted.clear();
timerState.clear();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment