Skip to content

Instantly share code, notes, and snippets.

@fhussonnois
Created March 11, 2024 09:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fhussonnois/ba3123afbba3b61ec5d79926f3466525 to your computer and use it in GitHub Desktop.
Save fhussonnois/ba3123afbba3b61ec5d79926f3466525 to your computer and use it in GitHub Desktop.
Kafka Streams : DeduplicationTransformer
public class DeduplicationTransformer<K, V, E> implements ValueTransformerWithKey<K, V, V> {
private ProcessorContext context;
private WindowStore<E, Long> eventIdStore;
private final String storeName;
private final KeyValueMapper<K, V, E> idExtractor;
private final Duration windowSize;
DeduplicationTransformer(final String storeName,
final Duration windowSize,
final KeyValueMapper<K, V, E> idExtractor) {
if (windowSize.isZero() || windowSize.isNegative()) {
throw new IllegalArgumentException("windowSize duration per event must be >= 1");
}
this.windowSize = windowSize;
this.idExtractor = idExtractor;
this.storeName = storeName;
}
@Override
@SuppressWarnings("unchecked")
public void init(final ProcessorContext context) {
this.context = context;
eventIdStore = (WindowStore<E, Long>) context.getStateStore(storeName);
}
@Override
public V transform(final K readOnlyKey, final V value) {
final E eventId = idExtractor.apply(readOnlyKey, value);
if (eventId == null) {
return value;
} else {
final V output;
if (isDuplicate(eventId)) {
output = null;
updateTimestampOfExistingEventToPreventExpiry(eventId, context.timestamp());
} else {
output = value;
rememberNewEvent(eventId, context.timestamp());
}
return output;
}
}
private boolean isDuplicate(final E eventId) {
final long eventTime = context.timestamp();
// TODO Step 1) Calculate the time window (startTime - endTime) to which the event belongs.
long leftDurationMs = (eventTime / windowSize.toMillis()) * windowSize.toMillis();
long rightDurationMs = leftDurationMs + windowSize.toMillis();
// TODO Step 2) Query the local WindowStore using the given event and window computed previously.
final WindowStoreIterator<Long> timeIterator = eventIdStore.fetch(
eventId,
leftDurationMs,
rightDurationMs);
// TODO Step 3) Check if the event is a duplicate.
final boolean isDuplicate = timeIterator.hasNext();
timeIterator.close();
return isDuplicate;
}
private void updateTimestampOfExistingEventToPreventExpiry(final E eventId, final long newTimestamp) {
eventIdStore.put(eventId, newTimestamp, newTimestamp);
}
private void rememberNewEvent(final E eventId, final long timestamp) {
eventIdStore.put(eventId, timestamp, timestamp);
}
@Override
public void close() {
// Note: The store should NOT be closed manually here via `eventIdStore.close()`!
// The Kafka Streams API will automatically close stores when necessary.
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment