Created
December 21, 2023 13:26
-
-
Save matanper/3e7d1c7ad6e1b9f88f11b5c4259df241 to your computer and use it in GitHub Desktop.
Apache Flink deduplication by session key
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.monocle; | |
import java.io.IOException; | |
import org.apache.flink.api.common.state.ValueState; | |
import org.apache.flink.api.common.state.ValueStateDescriptor; | |
import org.apache.flink.streaming.api.windowing.time.Time; | |
import org.apache.flink.configuration.Configuration; | |
import org.apache.flink.util.Collector; | |
import org.apache.flink.streaming.api.functions.KeyedProcessFunction; | |
public class DeduplicationKeySession<K, T> extends KeyedProcessFunction<K, T, T> { | |
private transient ValueState<Boolean> eventEmitted; | |
private transient ValueState<Long> timerState; | |
private Time sessionLength; | |
public DeduplicationKeySession(Time sessionLength) { | |
this.sessionLength = sessionLength; | |
} | |
@Override | |
public void open(Configuration parameters) { | |
this.eventEmitted = getRuntimeContext().getState(new ValueStateDescriptor<>("emitted", Boolean.class)); | |
this.timerState = getRuntimeContext().getState(new ValueStateDescriptor<>("timerState", Long.class)); | |
} | |
@Override | |
public void processElement(T value, KeyedProcessFunction<K, T, T>.Context ctx, Collector<T> out) throws IOException { | |
// Emitting only first value, then ignore the rest until state is cleared | |
Boolean alreadyEmitted = eventEmitted.value(); | |
if (alreadyEmitted == null || !alreadyEmitted) { | |
out.collect(value); | |
eventEmitted.update(true); | |
} | |
// In order to act like session window, remove previous timer if needed and creates new one | |
Long currentTimer = timerState.value(); | |
if (currentTimer == null || ctx.timestamp() > currentTimer) { | |
if (currentTimer != null) { | |
ctx.timerService().deleteEventTimeTimer(currentTimer); | |
} | |
long newTimerTime = ctx.timestamp() + sessionLength.toMilliseconds(); | |
ctx.timerService().registerEventTimeTimer(newTimerTime); | |
timerState.update(newTimerTime); | |
} | |
} | |
@Override | |
public void onTimer(long timestamp, OnTimerContext ctx, Collector<T> out) { | |
eventEmitted.clear(); | |
timerState.clear(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment