Created
March 11, 2024 09:42
-
-
Save fhussonnois/ba3123afbba3b61ec5d79926f3466525 to your computer and use it in GitHub Desktop.
Kafka Streams : DeduplicationTransformer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class DeduplicationTransformer<K, V, E> implements ValueTransformerWithKey<K, V, V> { | |
private ProcessorContext context; | |
private WindowStore<E, Long> eventIdStore; | |
private final String storeName; | |
private final KeyValueMapper<K, V, E> idExtractor; | |
private final Duration windowSize; | |
DeduplicationTransformer(final String storeName, | |
final Duration windowSize, | |
final KeyValueMapper<K, V, E> idExtractor) { | |
if (windowSize.isZero() || windowSize.isNegative()) { | |
throw new IllegalArgumentException("windowSize duration per event must be >= 1"); | |
} | |
this.windowSize = windowSize; | |
this.idExtractor = idExtractor; | |
this.storeName = storeName; | |
} | |
@Override | |
@SuppressWarnings("unchecked") | |
public void init(final ProcessorContext context) { | |
this.context = context; | |
eventIdStore = (WindowStore<E, Long>) context.getStateStore(storeName); | |
} | |
@Override | |
public V transform(final K readOnlyKey, final V value) { | |
final E eventId = idExtractor.apply(readOnlyKey, value); | |
if (eventId == null) { | |
return value; | |
} else { | |
final V output; | |
if (isDuplicate(eventId)) { | |
output = null; | |
updateTimestampOfExistingEventToPreventExpiry(eventId, context.timestamp()); | |
} else { | |
output = value; | |
rememberNewEvent(eventId, context.timestamp()); | |
} | |
return output; | |
} | |
} | |
private boolean isDuplicate(final E eventId) { | |
final long eventTime = context.timestamp(); | |
// TODO Step 1) Calculate the time window (startTime - endTime) to which the event belongs. | |
long leftDurationMs = (eventTime / windowSize.toMillis()) * windowSize.toMillis(); | |
long rightDurationMs = leftDurationMs + windowSize.toMillis(); | |
// TODO Step 2) Query the local WindowStore using the given event and window computed previously. | |
final WindowStoreIterator<Long> timeIterator = eventIdStore.fetch( | |
eventId, | |
leftDurationMs, | |
rightDurationMs); | |
// TODO Step 3) Check if the event is a duplicate. | |
final boolean isDuplicate = timeIterator.hasNext(); | |
timeIterator.close(); | |
return isDuplicate; | |
} | |
private void updateTimestampOfExistingEventToPreventExpiry(final E eventId, final long newTimestamp) { | |
eventIdStore.put(eventId, newTimestamp, newTimestamp); | |
} | |
private void rememberNewEvent(final E eventId, final long timestamp) { | |
eventIdStore.put(eventId, timestamp, timestamp); | |
} | |
@Override | |
public void close() { | |
// Note: The store should NOT be closed manually here via `eventIdStore.close()`! | |
// The Kafka Streams API will automatically close stores when necessary. | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment