Last active
March 26, 2024 17:34
-
-
Save devgrok/10d02f67204fcf1a650be569a391ef3d to your computer and use it in GitHub Desktop.
Deduplicating in kafka-streams using the Processor API
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright Confluent Inc. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package io.confluent.examples.streams; | |
import org.apache.kafka.common.serialization.*; | |
import org.apache.kafka.common.serialization.Serdes.ByteArraySerde; | |
import org.apache.kafka.common.utils.Bytes; | |
import org.apache.kafka.streams.*; | |
import org.apache.kafka.streams.kstream.KStream; | |
import org.apache.kafka.streams.kstream.Materialized; | |
import org.apache.kafka.streams.kstream.Transformer; | |
import org.apache.kafka.streams.kstream.TransformerSupplier; | |
import org.apache.kafka.streams.kstream.internals.MaterializedInternal; | |
import org.apache.kafka.streams.processor.ProcessorContext; | |
import org.apache.kafka.streams.processor.PunctuationType; | |
import org.apache.kafka.streams.state.*; | |
import org.apache.kafka.test.TestUtils; | |
import org.junit.Test; | |
import java.nio.charset.StandardCharsets; | |
import java.time.Duration; | |
import java.util.*; | |
import java.util.concurrent.TimeUnit; | |
import java.util.function.BiFunction; | |
import java.util.stream.Collectors; | |
import static org.hamcrest.CoreMatchers.equalTo; | |
import static org.hamcrest.MatcherAssert.assertThat; | |
/** | |
* End-to-end integration test that demonstrates how to remove duplicate records from an input | |
* stream. | |
* <p> | |
* Here, a stateful {@link Transformer} (from the Processor API) | |
* detects and discards duplicate input records based on an "event id" that is embedded in each | |
* input record. This transformer is then included in a topology defined via the DSL. | |
* <p> | |
* In this simplified example, the values of input records represent the event ID by which | |
* duplicates will be detected. In practice, record values would typically be a more complex data | |
* structure, with perhaps one of the fields being such an event ID. De-duplication by an event ID | |
* is but one example of how to perform de-duplication in general. The code example below can be | |
* adapted to other de-duplication approaches. | |
* <p> | |
* Note: This example uses lambda expressions and thus works with Java 8+ only. | |
*/ | |
public class DeduplicationKeyValueTransformerIntegrationTest { | |
private static final String storeName = "eventId-store"; | |
/** | |
* Discards duplicate records from the input stream. | |
* <p> | |
* Duplicates are determined by a function passed to it on creation to allow for custom comparisons, that compares | |
* new values with the previous. | |
* <p> | |
* Duplicate records are detected based on an value. The transformer remembers the last value for each key in a state | |
* store, which automatically purges/expires values from the store after a certain amount of | |
* time has passed to prevent the store from growing indefinitely. This assumes the consumer can tolerate some duplicates. | |
* This processor's purpose is to reduce the 'noise' caused by a lot of duplicate messages. | |
* <p> | |
* Note: This code is for demonstration purposes and was not tested for production usage. | |
*/ | |
private static class DedupeTransformer<K, V> implements Transformer<K, V, KeyValue<K, V>> { | |
private ProcessorContext context; | |
/** | |
* Key: event ID | |
* Value: timestamp (event-time) of the corresponding event when the event ID was seen for the | |
* first time | |
*/ | |
private TimestampedKeyValueStore<K, V> eventStore; | |
public static final int CLEAR_INTERVAL_MILLIS = 1000; | |
private final long maintainDurationMs; | |
private final BiFunction<V, V, Boolean> valueComparator; | |
/** | |
*/ | |
DedupeTransformer(BiFunction<V, V, Boolean> valueComparator, long maintainDurationMs) { | |
this.valueComparator = valueComparator; | |
this.maintainDurationMs = maintainDurationMs; | |
} | |
@Override | |
@SuppressWarnings("unchecked") | |
public void init(final ProcessorContext context) { | |
this.context = context; | |
eventStore = (TimestampedKeyValueStore<K, V>) context.getStateStore(storeName); | |
this.context.schedule(Duration.ofMillis(CLEAR_INTERVAL_MILLIS), PunctuationType.WALL_CLOCK_TIME, this::clearExpired); | |
} | |
private void clearExpired(long currentStreamTimeMs) { | |
try (KeyValueIterator<K, ValueAndTimestamp<V>> iterator = eventStore.all()) { | |
while (iterator.hasNext()) { | |
final KeyValue<K, ValueAndTimestamp<V>> entry = iterator.next(); | |
final long eventTimestamp = entry.value.timestamp(); | |
if (hasExpired(eventTimestamp, currentStreamTimeMs)) { | |
eventStore.delete(entry.key); | |
} | |
} | |
} | |
} | |
private boolean hasExpired(final long eventTimestamp, final long currentStreamTimeMs) { | |
return (currentStreamTimeMs - eventTimestamp) > maintainDurationMs; | |
} | |
public KeyValue<K, V> transform(final K key, final V value) { | |
if (value == null) { | |
return KeyValue.pair(key, null); | |
} else { | |
final KeyValue<K, V> output; | |
if (isDuplicate(key, value)) { | |
output = null; | |
updateTimestampOfExistingEventToPreventExpiry(key, value, context.timestamp()); | |
} else { | |
output = KeyValue.pair(key, value); | |
rememberNewEvent(key, value, context.timestamp()); | |
} | |
return output; | |
} | |
} | |
private boolean isDuplicate(final K key, final V value) { | |
boolean isDuplicate = false; | |
ValueAndTimestamp<V> storedValue = eventStore.get(key); | |
if (storedValue != null) { | |
V previous = storedValue.value(); | |
isDuplicate = (valueComparator.apply(previous, value) == Boolean.TRUE); | |
} | |
return isDuplicate; | |
} | |
private void updateTimestampOfExistingEventToPreventExpiry(final K key, final V value, final long newTimestamp) { | |
eventStore.put(key, ValueAndTimestamp.make(value, newTimestamp)); | |
} | |
private void rememberNewEvent(final K key, final V value, final long timestamp) { | |
eventStore.put(key, ValueAndTimestamp.make(value, timestamp)); | |
} | |
@Override | |
public void close() { | |
// Note: The store should NOT be closed manually here via `eventStore.close()`! | |
// The Kafka Streams API will automatically close stores when necessary. | |
} | |
} | |
private class DedupeValueForKey<K, V> implements TransformerSupplier<K, V, KeyValue<K, V>> { | |
private final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized; | |
private final BiFunction<V, V, Boolean> valueComparator; | |
private long maintainDurationMs = TimeUnit.HOURS.toMillis(30); | |
public DedupeValueForKey(BiFunction<V, V, Boolean> valueComparator, Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) { | |
this.materialized = new MaterializedInternal<>(materialized, null, null); | |
// can't pass in name provider | |
// new MaterializedInternal<>(materialized, builder, REDUCE_NAME); | |
this.valueComparator = valueComparator; | |
} | |
@Override | |
public Transformer<K, V, KeyValue<K, V>> get() { | |
return new DedupeTransformer( | |
valueComparator, | |
maintainDurationMs | |
); | |
} | |
// provide store(s) that will be added and connected to the associated transformer | |
// the store name from the builder ("myTransformState") is used to access the store later via the ProcessorContext | |
@Override | |
public Set<StoreBuilder<?>> stores() { | |
KeyValueBytesStoreSupplier supplier = Stores.persistentTimestampedKeyValueStore(storeName); | |
StoreBuilder<TimestampedKeyValueStore<K, V>> builder = Stores.timestampedKeyValueStoreBuilder( | |
supplier, | |
materialized.keySerde(), | |
materialized.valueSerde()); | |
return Collections.singleton(builder); | |
} | |
public void setMaintainDurationMs(long maintainDurationMs) { | |
this.maintainDurationMs = maintainDurationMs; | |
} | |
} | |
@Test | |
public void shouldRemoveDuplicatesFromTheInput() { | |
final String firstId = UUID.randomUUID().toString(); // e.g. "4ff3cb44-abcb-46e3-8f9a-afb7cc74fbb8" | |
final String secondId = UUID.randomUUID().toString(); | |
final String thirdId = UUID.randomUUID().toString(); | |
final byte[] keyId = UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8); | |
final List<String> inputValues = Arrays.asList(firstId, secondId, secondId, thirdId, | |
thirdId, thirdId); | |
final List<KeyValue<byte[], String>> inputs = inputValues.stream().map(v -> new KeyValue<>(keyId, v)).collect(Collectors.toList()); | |
final List<String> expectedValues = Arrays.asList(firstId, secondId, thirdId); | |
// | |
// Step 1: Configure and start the processor topology. | |
// | |
final StreamsBuilder builder = new StreamsBuilder(); | |
final Properties streamsConfiguration = new Properties(); | |
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "deduplication-lambda-integration-test"); | |
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy config"); | |
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName()); | |
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); | |
// Use a temporary directory for storing state, which will be automatically removed after the test. | |
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); | |
// How long we "remember" an event. During this time, any incoming duplicates of the event | |
// will be, well, dropped, thereby de-duplicating the input data. | |
// | |
// The actual value depends on your use case. To reduce memory and disk usage, you could | |
// decrease the size to purge old windows more frequently at the cost of potentially missing out | |
// on de-duplicating late-arriving records. | |
final Duration windowSize = Duration.ofMinutes(10); | |
// retention period must be at least window size -- for this use case, we don't need a longer retention period | |
// and thus just use the window size as retention time | |
final Duration retentionPeriod = windowSize; | |
final String inputTopic = "inputTopic"; | |
final String outputTopic = "outputTopic"; | |
final KStream<byte[], String> stream = builder.stream(inputTopic); | |
DedupeValueForKey transformerSupplier = new DedupeValueForKey( | |
(value1, value2) -> value1.equals(value2), | |
Materialized.with(new ByteArraySerde(), new Serdes.StringSerde()) | |
); | |
transformerSupplier.setMaintainDurationMs(retentionPeriod.toMillis()); | |
final KStream<byte[], String> deduplicated = stream.transform( | |
// In this example, we assume that the record value as-is represents a unique event ID by | |
// which we can perform de-duplication. If your records are different, adapt the comparison | |
// function as needed | |
transformerSupplier | |
); | |
deduplicated.to(outputTopic); | |
try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(builder.build(), streamsConfiguration)) { | |
// | |
// Step 2: Setup input and output topics. | |
// | |
final TestInputTopic<byte[], String> input = topologyTestDriver | |
.createInputTopic(inputTopic, | |
new ByteArraySerializer(), | |
new StringSerializer()); | |
final TestOutputTopic<byte[], String> output = topologyTestDriver | |
.createOutputTopic(outputTopic, | |
new ByteArrayDeserializer(), | |
new StringDeserializer()); | |
// | |
// Step 3: Produce some input data to the input topic. | |
// | |
input.pipeKeyValueList(inputs); | |
// | |
// Step 4: Verify the application's output data. | |
// | |
assertThat(output.readValuesToList(), equalTo(expectedValues)); | |
} | |
} | |
@Test | |
public void shouldForgetAfterExpiry() { | |
final String firstId = UUID.randomUUID().toString(); // e.g. "4ff3cb44-abcb-46e3-8f9a-afb7cc74fbb8" | |
final String secondId = UUID.randomUUID().toString(); | |
final String thirdId = UUID.randomUUID().toString(); | |
final byte[] keyId = UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8); | |
final List<String> inputValues = Arrays.asList(firstId, secondId, secondId, thirdId, | |
thirdId, thirdId); | |
final List<KeyValue<byte[], String>> inputs = inputValues.stream().map(v -> new KeyValue<>(keyId, v)).collect(Collectors.toList()); | |
final List<String> expectedValues = Arrays.asList(firstId, secondId, thirdId, thirdId); | |
// | |
// Step 1: Configure and start the processor topology. | |
// | |
final StreamsBuilder builder = new StreamsBuilder(); | |
final Properties streamsConfiguration = new Properties(); | |
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "deduplication-lambda-integration-test"); | |
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy config"); | |
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName()); | |
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); | |
// Use a temporary directory for storing state, which will be automatically removed after the test. | |
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); | |
// How long we "remember" an event. During this time, any incoming duplicates of the event | |
// will be, well, dropped, thereby de-duplicating the input data. | |
// | |
// The actual value depends on your use case. To reduce memory and disk usage, you could | |
// decrease the size to purge old windows more frequently at the cost of potentially missing out | |
// on de-duplicating late-arriving records. | |
final Duration windowSize = Duration.ofMinutes(10); | |
// retention period must be at least window size -- for this use case, we don't need a longer retention period | |
// and thus just use the window size as retention time | |
final Duration retentionPeriod = windowSize; | |
final String inputTopic = "inputTopic"; | |
final String outputTopic = "outputTopic"; | |
final KStream<byte[], String> stream = builder.stream(inputTopic); | |
DedupeValueForKey transformerSupplier = new DedupeValueForKey( | |
(value1, value2) -> value1.equals(value2), | |
Materialized.with(new ByteArraySerde(), new Serdes.StringSerde()) | |
); | |
transformerSupplier.setMaintainDurationMs(retentionPeriod.toMillis()); | |
final KStream<byte[], String> deduplicated = stream.transform( | |
// In this example, we assume that the record value as-is represents a unique event ID by | |
// which we can perform de-duplication. If your records are different, adapt the comparison | |
// function as needed | |
transformerSupplier | |
); | |
deduplicated.to(outputTopic); | |
try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(builder.build(), streamsConfiguration)) { | |
// | |
// Step 2: Setup input and output topics. | |
// | |
final TestInputTopic<byte[], String> input = topologyTestDriver | |
.createInputTopic(inputTopic, | |
new ByteArraySerializer(), | |
new StringSerializer()); | |
final TestOutputTopic<byte[], String> output = topologyTestDriver | |
.createOutputTopic(outputTopic, | |
new ByteArrayDeserializer(), | |
new StringDeserializer()); | |
// | |
// Step 3: Produce some input data to the input topic. | |
// | |
input.pipeKeyValueList(inputs); | |
// | |
// Step 4: Advance wall clock time | |
// | |
topologyTestDriver.advanceWallClockTime(windowSize.plus(Duration.ofMillis(DedupeTransformer.CLEAR_INTERVAL_MILLIS))); | |
// | |
// Step 5: Send another | |
// | |
input.pipeInput(keyId, thirdId); | |
// | |
// Step 6: Verify the application's output data. | |
// | |
assertThat(output.readValuesToList(), equalTo(expectedValues)); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright Confluent Inc. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package io.confluent.examples.streams; | |
import org.apache.kafka.common.serialization.*; | |
import org.apache.kafka.common.serialization.Serdes.ByteArraySerde; | |
import org.apache.kafka.common.utils.Bytes; | |
import org.apache.kafka.streams.*; | |
import org.apache.kafka.streams.kstream.*; | |
import org.apache.kafka.streams.kstream.internals.MaterializedInternal; | |
import org.apache.kafka.streams.processor.ProcessorContext; | |
import org.apache.kafka.streams.processor.PunctuationType; | |
import org.apache.kafka.streams.state.*; | |
import org.apache.kafka.test.TestUtils; | |
import org.junit.Test; | |
import java.nio.charset.StandardCharsets; | |
import java.time.Duration; | |
import java.util.*; | |
import java.util.concurrent.TimeUnit; | |
import java.util.function.BiFunction; | |
import java.util.stream.Collectors; | |
import static org.hamcrest.CoreMatchers.equalTo; | |
import static org.hamcrest.MatcherAssert.assertThat; | |
/** | |
* End-to-end integration test that demonstrates how to remove duplicate records from an input | |
* stream. | |
* <p> | |
* Here, a stateful {@link Transformer} (from the Processor API) | |
* detects and discards duplicate input records based on an "event id" that is embedded in each | |
* input record. This transformer is then included in a topology defined via the DSL. | |
* <p> | |
* In this simplified example, the values of input records represent the event ID by which | |
* duplicates will be detected. In practice, record values would typically be a more complex data | |
* structure, with perhaps one of the fields being such an event ID. De-duplication by an event ID | |
* is but one example of how to perform de-duplication in general. The code example below can be | |
* adapted to other de-duplication approaches. | |
* <p> | |
* Note: This example uses lambda expressions and thus works with Java 8+ only. | |
*/ | |
public class DeduplicationValueTransformerIntegrationTest { | |
private static final String storeName = "eventId-store"; | |
/** | |
* Discards duplicate records from the input stream. Uses a ValueTransformerWithKey instead of Transformer | |
* to remove the need for re-partitioning in the KStreams DSL due to a possible key change. | |
* <p> | |
* Duplicates are determined by a function passed to it on creation to allow for custom comparisons, that compares | |
* new values with the previous. | |
* <p> | |
* Duplicate records are detected based on an value. The transformer remembers the last value for each key in a state | |
* store, which automatically purges/expires values from the store after a certain amount of | |
* time has passed to prevent the store from growing indefinitely. This assumes the consumer can tolerate some duplicates. | |
* This processor's purpose is to reduce the 'noise' caused by a lot of duplicate messages. | |
* <p> | |
* Note: This code is for demonstration purposes and was not tested for production usage. | |
*/ | |
private static class DedupeValueTransformer<K, V> implements ValueTransformerWithKey<K, V, Iterable<V>> { | |
private ProcessorContext context; | |
/** | |
* Key: event ID | |
* Value: timestamp (event-time) of the corresponding event when the event ID was seen for the | |
* first time | |
*/ | |
private TimestampedKeyValueStore<K, V> eventStore; | |
public static final int CLEAR_INTERVAL_MILLIS = 1000; | |
private final long maintainDurationMs; | |
private final BiFunction<V, V, Boolean> valueComparator; | |
/** | |
*/ | |
DedupeValueTransformer(BiFunction<V, V, Boolean> valueComparator, long maintainDurationMs) { | |
this.valueComparator = valueComparator; | |
this.maintainDurationMs = maintainDurationMs; | |
} | |
@Override | |
@SuppressWarnings("unchecked") | |
public void init(final ProcessorContext context) { | |
this.context = context; | |
eventStore = (TimestampedKeyValueStore<K, V>) context.getStateStore(storeName); | |
this.context.schedule(Duration.ofMillis(CLEAR_INTERVAL_MILLIS), PunctuationType.WALL_CLOCK_TIME, this::clearExpired); | |
} | |
private void clearExpired(long currentStreamTimeMs) { | |
try (KeyValueIterator<K, ValueAndTimestamp<V>> iterator = eventStore.all()) { | |
while (iterator.hasNext()) { | |
final KeyValue<K, ValueAndTimestamp<V>> entry = iterator.next(); | |
final long eventTimestamp = entry.value.timestamp(); | |
if (hasExpired(eventTimestamp, currentStreamTimeMs)) { | |
eventStore.delete(entry.key); | |
} | |
} | |
} | |
} | |
private boolean hasExpired(final long eventTimestamp, final long currentStreamTimeMs) { | |
return (currentStreamTimeMs - eventTimestamp) > maintainDurationMs; | |
} | |
@Override | |
public Iterable<V> transform(K key, V value) { | |
if (value == null) { | |
return Collections.emptyList(); | |
} else { | |
final Iterable<V> output; | |
ValueAndTimestamp<V> storedValue = eventStore.get(key); | |
if (context.timestamp() < storedValue.timestamp()) { | |
// message is older than last stored, ignoring | |
output = Collections.emptyList(); | |
} else if (isDuplicate(key, value, storedValue)) { | |
output = Collections.emptyList(); | |
updateTimestampOfExistingEventToPreventExpiry(key, storedValue.value(), context.timestamp()); | |
} else { | |
output = Collections.singleton(value); | |
rememberNewEvent(key, value, context.timestamp()); | |
} | |
return output; | |
} | |
} | |
private boolean isDuplicate(final K key, final V value, ValueAndTimestamp<V> storedValue) { | |
boolean isDuplicate = false; | |
if (storedValue != null) { | |
V previous = storedValue.value(); | |
isDuplicate = (valueComparator.apply(previous, value) == Boolean.TRUE); | |
} | |
return isDuplicate; | |
} | |
private void updateTimestampOfExistingEventToPreventExpiry(final K key, final V value, final long newTimestamp) { | |
eventStore.put(key, ValueAndTimestamp.make(value, newTimestamp)); | |
} | |
private void rememberNewEvent(final K key, final V value, final long timestamp) { | |
eventStore.put(key, ValueAndTimestamp.make(value, timestamp)); | |
} | |
@Override | |
public void close() { | |
// Note: The store should NOT be closed manually here via `eventStore.close()`! | |
// The Kafka Streams API will automatically close stores when necessary. | |
} | |
} | |
private class DedupeValueWithKey<K, V> implements ValueTransformerWithKeySupplier<K, V, Iterable<V>> { | |
private final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized; | |
private final BiFunction<V, V, Boolean> valueComparator; | |
private long maintainDurationMs = TimeUnit.HOURS.toMillis(30); | |
public DedupeValueWithKey(BiFunction<V, V, Boolean> valueComparator, Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) { | |
this.materialized = new MaterializedInternal<>(materialized, null, null); | |
// can't pass in name provider | |
// new MaterializedInternal<>(materialized, builder, REDUCE_NAME); | |
this.valueComparator = valueComparator; | |
} | |
@Override | |
public ValueTransformerWithKey<K, V, Iterable<V>> get() { | |
return new DedupeValueTransformer( | |
valueComparator, | |
maintainDurationMs | |
); | |
} | |
// provide store(s) that will be added and connected to the associated transformer | |
// the store name from the builder ("myTransformState") is used to access the store later via the ProcessorContext | |
@Override | |
public Set<StoreBuilder<?>> stores() { | |
KeyValueBytesStoreSupplier supplier = Stores.persistentTimestampedKeyValueStore(storeName); | |
StoreBuilder<TimestampedKeyValueStore<K, V>> builder = Stores.timestampedKeyValueStoreBuilder( | |
supplier, | |
materialized.keySerde(), | |
materialized.valueSerde()); | |
return Collections.singleton(builder); | |
} | |
public void setMaintainDurationMs(long maintainDurationMs) { | |
this.maintainDurationMs = maintainDurationMs; | |
} | |
} | |
@Test | |
public void shouldRemoveDuplicatesFromTheInput() { | |
final String firstId = UUID.randomUUID().toString(); // e.g. "4ff3cb44-abcb-46e3-8f9a-afb7cc74fbb8" | |
final String secondId = UUID.randomUUID().toString(); | |
final String thirdId = UUID.randomUUID().toString(); | |
final byte[] keyId = UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8); | |
final List<String> inputValues = Arrays.asList(firstId, secondId, secondId, thirdId, | |
thirdId, thirdId); | |
final List<KeyValue<byte[], String>> inputs = inputValues.stream().map(v -> new KeyValue<>(keyId, v)).collect(Collectors.toList()); | |
final List<String> expectedValues = Arrays.asList(firstId, secondId, thirdId); | |
// | |
// Step 1: Configure and start the processor topology. | |
// | |
final StreamsBuilder builder = new StreamsBuilder(); | |
final Properties streamsConfiguration = new Properties(); | |
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "deduplication-lambda-integration-test"); | |
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy config"); | |
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName()); | |
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); | |
// Use a temporary directory for storing state, which will be automatically removed after the test. | |
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); | |
// How long we "remember" an event. During this time, any incoming duplicates of the event | |
// will be, well, dropped, thereby de-duplicating the input data. | |
// | |
// The actual value depends on your use case. To reduce memory and disk usage, you could | |
// decrease the size to purge old windows more frequently at the cost of potentially missing out | |
// on de-duplicating late-arriving records. | |
final Duration windowSize = Duration.ofMinutes(10); | |
// retention period must be at least window size -- for this use case, we don't need a longer retention period | |
// and thus just use the window size as retention time | |
final Duration retentionPeriod = windowSize; | |
final String inputTopic = "inputTopic"; | |
final String outputTopic = "outputTopic"; | |
final KStream<byte[], String> stream = builder.stream(inputTopic); | |
DedupeValueWithKey transformerSupplier = new DedupeValueWithKey( | |
(value1, value2) -> value1.equals(value2), | |
Materialized.with(new ByteArraySerde(), new Serdes.StringSerde()) | |
); | |
transformerSupplier.setMaintainDurationMs(retentionPeriod.toMillis()); | |
final KStream<byte[], String> deduplicated = stream.flatTransformValues( | |
// In this example, we assume that the record value as-is represents a unique event ID by | |
// which we can perform de-duplication. If your records are different, adapt the comparison | |
// function as needed | |
transformerSupplier | |
); | |
deduplicated.to(outputTopic); | |
try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(builder.build(), streamsConfiguration)) { | |
// | |
// Step 2: Setup input and output topics. | |
// | |
final TestInputTopic<byte[], String> input = topologyTestDriver | |
.createInputTopic(inputTopic, | |
new ByteArraySerializer(), | |
new StringSerializer()); | |
final TestOutputTopic<byte[], String> output = topologyTestDriver | |
.createOutputTopic(outputTopic, | |
new ByteArrayDeserializer(), | |
new StringDeserializer()); | |
// | |
// Step 3: Produce some input data to the input topic. | |
// | |
input.pipeKeyValueList(inputs); | |
// | |
// Step 4: Verify the application's output data. | |
// | |
assertThat(output.readValuesToList(), equalTo(expectedValues)); | |
} | |
} | |
@Test | |
public void shouldForgetAfterExpiry() { | |
final String firstId = UUID.randomUUID().toString(); // e.g. "4ff3cb44-abcb-46e3-8f9a-afb7cc74fbb8" | |
final String secondId = UUID.randomUUID().toString(); | |
final String thirdId = UUID.randomUUID().toString(); | |
final byte[] keyId = UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8); | |
final List<String> inputValues = Arrays.asList(firstId, secondId, secondId, thirdId, | |
thirdId, thirdId); | |
final List<KeyValue<byte[], String>> inputs = inputValues.stream().map(v -> new KeyValue<>(keyId, v)).collect(Collectors.toList()); | |
final List<String> expectedValues = Arrays.asList(firstId, secondId, thirdId, thirdId); | |
// | |
// Step 1: Configure and start the processor topology. | |
// | |
final StreamsBuilder builder = new StreamsBuilder(); | |
final Properties streamsConfiguration = new Properties(); | |
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "deduplication-lambda-integration-test"); | |
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy config"); | |
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName()); | |
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); | |
// Use a temporary directory for storing state, which will be automatically removed after the test. | |
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); | |
// How long we "remember" an event. During this time, any incoming duplicates of the event | |
// will be, well, dropped, thereby de-duplicating the input data. | |
// | |
// The actual value depends on your use case. To reduce memory and disk usage, you could | |
// decrease the size to purge old windows more frequently at the cost of potentially missing out | |
// on de-duplicating late-arriving records. | |
final Duration windowSize = Duration.ofMinutes(10); | |
// retention period must be at least window size -- for this use case, we don't need a longer retention period | |
// and thus just use the window size as retention time | |
final Duration retentionPeriod = windowSize; | |
final String inputTopic = "inputTopic"; | |
final String outputTopic = "outputTopic"; | |
final KStream<byte[], String> stream = builder.stream(inputTopic); | |
DedupeValueWithKey transformerSupplier = new DedupeValueWithKey( | |
(value1, value2) -> value1.equals(value2), | |
Materialized.with(new ByteArraySerde(), new Serdes.StringSerde()) | |
); | |
transformerSupplier.setMaintainDurationMs(retentionPeriod.toMillis()); | |
final KStream<byte[], String> deduplicated = stream.flatTransformValues( | |
// In this example, we assume that the record value as-is represents a unique event ID by | |
// which we can perform de-duplication. If your records are different, adapt the comparison | |
// function as needed | |
transformerSupplier | |
); | |
deduplicated.to(outputTopic); | |
try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(builder.build(), streamsConfiguration)) { | |
// | |
// Step 2: Setup input and output topics. | |
// | |
final TestInputTopic<byte[], String> input = topologyTestDriver | |
.createInputTopic(inputTopic, | |
new ByteArraySerializer(), | |
new StringSerializer()); | |
final TestOutputTopic<byte[], String> output = topologyTestDriver | |
.createOutputTopic(outputTopic, | |
new ByteArrayDeserializer(), | |
new StringDeserializer()); | |
// | |
// Step 3: Produce some input data to the input topic. | |
// | |
input.pipeKeyValueList(inputs); | |
// | |
// Step 4: Advance wall clock time | |
// | |
topologyTestDriver.advanceWallClockTime(windowSize.plus(Duration.ofMillis(DedupeValueTransformer.CLEAR_INTERVAL_MILLIS))); | |
// | |
// Step 5: Send another | |
// | |
input.pipeInput(keyId, thirdId); | |
// | |
// Step 6: Verify the application's output data. | |
// | |
assertThat(output.readValuesToList(), equalTo(expectedValues)); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright Confluent Inc. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package io.confluent.examples.streams; | |
import org.apache.kafka.common.serialization.*; | |
import org.apache.kafka.common.serialization.Serdes.ByteArraySerde; | |
import org.apache.kafka.common.utils.Bytes; | |
import org.apache.kafka.streams.*; | |
import org.apache.kafka.streams.kstream.KStream; | |
import org.apache.kafka.streams.kstream.Materialized; | |
import org.apache.kafka.streams.kstream.Transformer; | |
import org.apache.kafka.streams.kstream.TransformerSupplier; | |
import org.apache.kafka.streams.kstream.internals.MaterializedInternal; | |
import org.apache.kafka.streams.processor.ProcessorContext; | |
import org.apache.kafka.streams.state.*; | |
import org.apache.kafka.test.TestUtils; | |
import org.junit.Test; | |
import java.nio.charset.StandardCharsets; | |
import java.time.Duration; | |
import java.util.*; | |
import java.util.function.BiFunction; | |
import java.util.stream.Collectors; | |
import static org.hamcrest.CoreMatchers.equalTo; | |
import static org.hamcrest.MatcherAssert.assertThat; | |
/** | |
* End-to-end integration test that demonstrates how to remove duplicate records from an input | |
* stream. | |
* <p> | |
* Here, a stateful {@link org.apache.kafka.streams.kstream.Transformer} (from the Processor API) | |
* detects and discards duplicate input records based on an "event id" that is embedded in each | |
* input record. This transformer is then included in a topology defined via the DSL. | |
* <p> | |
* In this simplified example, the values of input records represent the event ID by which | |
* duplicates will be detected. In practice, record values would typically be a more complex data | |
* structure, with perhaps one of the fields being such an event ID. De-duplication by an event ID | |
* is but one example of how to perform de-duplication in general. The code example below can be | |
* adapted to other de-duplication approaches. | |
* <p> | |
* Note: This example uses lambda expressions and thus works with Java 8+ only. | |
*/ | |
public class EventDeduplicationLambdaIntegrationTest { | |
private static final String storeName = "eventId-store"; | |
/** | |
* Discards duplicate records from the input stream. | |
* <p> | |
* Duplicate records are detected based on an event ID; in this simplified example, the record | |
* value is the event ID. The transformer remembers known event IDs in an associated window state | |
* store, which automatically purges/expires event IDs from the store after a certain amount of | |
* time has passed to prevent the store from growing indefinitely. | |
* <p> | |
* Note: This code is for demonstration purposes and was not tested for production usage. | |
*/ | |
private static class DedupeTransformer<K, V> implements Transformer<K, V, KeyValue<K, V>> { | |
private ProcessorContext context; | |
/** | |
* Key: event ID | |
* Value: timestamp (event-time) of the corresponding event when the event ID was seen for the | |
* first time | |
*/ | |
private WindowStore<K, V> eventStore; | |
private final long leftDurationMs; | |
private final long rightDurationMs; | |
private final BiFunction<V, V, Boolean> valueComparator; | |
/**2 | |
* @param maintainDurationPerEventInMs how long to "remember" a known event (or rather, an event | |
* ID), during the time of which any incoming duplicates of | |
* the event will be dropped, thereby de-duplicating the | |
* input. | |
* @param valueComparator compares the value of previous record to current | |
*/ | |
DedupeTransformer(BiFunction<V, V, Boolean> valueComparator, final long maintainDurationPerEventInMs) { | |
if (maintainDurationPerEventInMs < 1) { | |
throw new IllegalArgumentException("maintain duration per event must be >= 1"); | |
} | |
leftDurationMs = maintainDurationPerEventInMs / 2; | |
rightDurationMs = maintainDurationPerEventInMs - leftDurationMs; | |
this.valueComparator = valueComparator; | |
} | |
@Override | |
@SuppressWarnings("unchecked") | |
public void init(final ProcessorContext context) { | |
this.context = context; | |
eventStore = (WindowStore<K, V>) context.getStateStore(storeName); | |
} | |
public KeyValue<K, V> transform(final K key, final V value) { | |
if (value == null) { | |
return KeyValue.pair(key, null); | |
} else { | |
final KeyValue<K, V> output; | |
if (isDuplicate(key, value)) { | |
output = null; | |
updateTimestampOfExistingEventToPreventExpiry(key, value, context.timestamp()); | |
} else { | |
output = KeyValue.pair(key, value); | |
rememberNewEvent(key, value, context.timestamp()); | |
} | |
return output; | |
} | |
} | |
private boolean isDuplicate(final K key, final V value) { | |
boolean isDuplicate = false; | |
final long eventTime = context.timestamp(); | |
final WindowStoreIterator<V> timeIterator = eventStore.fetch( | |
key, | |
eventTime - leftDurationMs, | |
eventTime + rightDurationMs); | |
if (!timeIterator.hasNext()) { | |
return false; | |
} | |
KeyValue<Long, V> storedValue = timeIterator.next(); | |
if (storedValue != null) { | |
V previous = storedValue.value; | |
isDuplicate = (valueComparator.apply(previous, value) == Boolean.TRUE); | |
} | |
return isDuplicate; | |
} | |
private void updateTimestampOfExistingEventToPreventExpiry(final K key, final V value, final long newTimestamp) { | |
eventStore.put(key, value, newTimestamp); | |
} | |
private void rememberNewEvent(final K key, final V value, final long timestamp) { | |
eventStore.put(key, value, timestamp); | |
} | |
@Override | |
public void close() { | |
// Note: The store should NOT be closed manually here via `eventStore.close()`! | |
// The Kafka Streams API will automatically close stores when necessary. | |
} | |
} | |
private class DedupeValueForKey<K, V> implements TransformerSupplier<K, V, KeyValue<K, V>> { | |
private final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized; | |
private final BiFunction<V, V, Boolean> valueComparator; | |
private final Duration retentionPeriod; | |
public DedupeValueForKey(BiFunction<V, V, Boolean> valueComparator, Duration retentionPeriod, Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) { | |
this.materialized = new MaterializedInternal<>(materialized, null, null); | |
// can't pass in name provider | |
// new MaterializedInternal<>(materialized, builder, REDUCE_NAME); | |
this.valueComparator = valueComparator; | |
this.retentionPeriod = retentionPeriod; | |
} | |
@Override | |
public Transformer<K, V, KeyValue<K, V>> get() { | |
return new DedupeTransformer( | |
valueComparator, | |
retentionPeriod.toMillis() | |
); | |
} | |
// provide store(s) that will be added and connected to the associated transformer | |
@Override | |
public Set<StoreBuilder<?>> stores() { | |
WindowBytesStoreSupplier supplier = Stores.persistentWindowStore(storeName, | |
retentionPeriod, | |
retentionPeriod, | |
false | |
); | |
StoreBuilder<WindowStore<K, V>> builder = Stores.windowStoreBuilder( | |
supplier, | |
materialized.keySerde(), | |
materialized.valueSerde()); | |
return Collections.singleton(builder); | |
} | |
} | |
@Test | |
public void shouldRemoveDuplicatesFromTheInput() { | |
final String firstId = UUID.randomUUID().toString(); // e.g. "4ff3cb44-abcb-46e3-8f9a-afb7cc74fbb8" | |
final String secondId = UUID.randomUUID().toString(); | |
final String thirdId = UUID.randomUUID().toString(); | |
final byte[] keyId = UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8); | |
final List<String> inputValues = Arrays.asList(firstId, secondId, secondId, thirdId, | |
thirdId, thirdId); | |
final List<KeyValue<byte[], String>> inputs = inputValues.stream().map(v -> new KeyValue<>(keyId, v)).collect(Collectors.toList()); | |
final List<String> expectedValues = Arrays.asList(firstId, secondId, thirdId); | |
// | |
// Step 1: Configure and start the processor topology. | |
// | |
final StreamsBuilder builder = new StreamsBuilder(); | |
final Properties streamsConfiguration = new Properties(); | |
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "deduplication-lambda-integration-test"); | |
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy config"); | |
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName()); | |
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); | |
// Use a temporary directory for storing state, which will be automatically removed after the test. | |
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); | |
// How long we "remember" an event. During this time, any incoming duplicates of the event | |
// will be, well, dropped, thereby de-duplicating the input data. | |
// | |
// The actual value depends on your use case. To reduce memory and disk usage, you could | |
// decrease the size to purge old windows more frequently at the cost of potentially missing out | |
// on de-duplicating late-arriving records. | |
final Duration windowSize = Duration.ofMinutes(10); | |
// retention period must be at least window size -- for this use case, we don't need a longer retention period | |
// and thus just use the window size as retention time | |
final Duration retentionPeriod = windowSize; | |
final String inputTopic = "inputTopic"; | |
final String outputTopic = "outputTopic"; | |
final KStream<byte[], String> stream = builder.stream(inputTopic); | |
final KStream<byte[], String> deduplicated = stream.transform( | |
// In this example, we assume that the record value as-is represents a unique event ID by | |
// which we can perform de-duplication. If your records are different, adapt the comparison | |
// function as needed | |
new DedupeValueForKey( | |
(value1, value2) -> value1.equals(value2), | |
retentionPeriod, | |
Materialized.with(new ByteArraySerde(), new Serdes.StringSerde()) | |
) | |
); | |
deduplicated.to(outputTopic); | |
try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(builder.build(), streamsConfiguration)) { | |
// | |
// Step 2: Setup input and output topics. | |
// | |
final TestInputTopic<byte[], String> input = topologyTestDriver | |
.createInputTopic(inputTopic, | |
new ByteArraySerializer(), | |
new StringSerializer()); | |
final TestOutputTopic<byte[], String> output = topologyTestDriver | |
.createOutputTopic(outputTopic, | |
new ByteArrayDeserializer(), | |
new StringDeserializer()); | |
// | |
// Step 3: Produce some input data to the input topic. | |
// | |
input.pipeKeyValueList(inputs); | |
// | |
// Step 4: Verify the application's output data. | |
// | |
assertThat(output.readValuesToList(), equalTo(expectedValues)); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment