Skip to content

Instantly share code, notes, and snippets.

  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save devgrok/10d02f67204fcf1a650be569a391ef3d to your computer and use it in GitHub Desktop.
Deduplicating in kafka-streams using the Processor API
/*
* Copyright Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.confluent.examples.streams;
import org.apache.kafka.common.serialization.*;
import org.apache.kafka.common.serialization.Serdes.ByteArraySerde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.state.*;
import org.apache.kafka.test.TestUtils;
import org.junit.Test;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
/**
* End-to-end integration test that demonstrates how to remove duplicate records from an input
* stream.
* <p>
* Here, a stateful {@link Transformer} (from the Processor API)
* detects and discards duplicate input records based on an "event id" that is embedded in each
* input record. This transformer is then included in a topology defined via the DSL.
* <p>
* In this simplified example, the values of input records represent the event ID by which
* duplicates will be detected. In practice, record values would typically be a more complex data
* structure, with perhaps one of the fields being such an event ID. De-duplication by an event ID
* is but one example of how to perform de-duplication in general. The code example below can be
* adapted to other de-duplication approaches.
* <p>
* Note: This example uses lambda expressions and thus works with Java 8+ only.
*/
public class DeduplicationKeyValueTransformerIntegrationTest {
private static final String storeName = "eventId-store";
/**
* Discards duplicate records from the input stream.
* <p>
* Duplicates are determined by a function passed to it on creation to allow for custom comparisons, that compares
* new values with the previous.
* <p>
* Duplicate records are detected based on an value. The transformer remembers the last value for each key in a state
* store, which automatically purges/expires values from the store after a certain amount of
* time has passed to prevent the store from growing indefinitely. This assumes the consumer can tolerate some duplicates.
* This processor's purpose is to reduce the 'noise' caused by a lot of duplicate messages.
* <p>
* Note: This code is for demonstration purposes and was not tested for production usage.
*/
private static class DedupeTransformer<K, V> implements Transformer<K, V, KeyValue<K, V>> {
private ProcessorContext context;
/**
* Key: event ID
* Value: timestamp (event-time) of the corresponding event when the event ID was seen for the
* first time
*/
private TimestampedKeyValueStore<K, V> eventStore;
public static final int CLEAR_INTERVAL_MILLIS = 1000;
private final long maintainDurationMs;
private final BiFunction<V, V, Boolean> valueComparator;
/**
*/
DedupeTransformer(BiFunction<V, V, Boolean> valueComparator, long maintainDurationMs) {
this.valueComparator = valueComparator;
this.maintainDurationMs = maintainDurationMs;
}
@Override
@SuppressWarnings("unchecked")
public void init(final ProcessorContext context) {
this.context = context;
eventStore = (TimestampedKeyValueStore<K, V>) context.getStateStore(storeName);
this.context.schedule(Duration.ofMillis(CLEAR_INTERVAL_MILLIS), PunctuationType.WALL_CLOCK_TIME, this::clearExpired);
}
private void clearExpired(long currentStreamTimeMs) {
try (KeyValueIterator<K, ValueAndTimestamp<V>> iterator = eventStore.all()) {
while (iterator.hasNext()) {
final KeyValue<K, ValueAndTimestamp<V>> entry = iterator.next();
final long eventTimestamp = entry.value.timestamp();
if (hasExpired(eventTimestamp, currentStreamTimeMs)) {
eventStore.delete(entry.key);
}
}
}
}
private boolean hasExpired(final long eventTimestamp, final long currentStreamTimeMs) {
return (currentStreamTimeMs - eventTimestamp) > maintainDurationMs;
}
public KeyValue<K, V> transform(final K key, final V value) {
if (value == null) {
return KeyValue.pair(key, null);
} else {
final KeyValue<K, V> output;
if (isDuplicate(key, value)) {
output = null;
updateTimestampOfExistingEventToPreventExpiry(key, value, context.timestamp());
} else {
output = KeyValue.pair(key, value);
rememberNewEvent(key, value, context.timestamp());
}
return output;
}
}
private boolean isDuplicate(final K key, final V value) {
boolean isDuplicate = false;
ValueAndTimestamp<V> storedValue = eventStore.get(key);
if (storedValue != null) {
V previous = storedValue.value();
isDuplicate = (valueComparator.apply(previous, value) == Boolean.TRUE);
}
return isDuplicate;
}
private void updateTimestampOfExistingEventToPreventExpiry(final K key, final V value, final long newTimestamp) {
eventStore.put(key, ValueAndTimestamp.make(value, newTimestamp));
}
private void rememberNewEvent(final K key, final V value, final long timestamp) {
eventStore.put(key, ValueAndTimestamp.make(value, timestamp));
}
@Override
public void close() {
// Note: The store should NOT be closed manually here via `eventStore.close()`!
// The Kafka Streams API will automatically close stores when necessary.
}
}
private class DedupeValueForKey<K, V> implements TransformerSupplier<K, V, KeyValue<K, V>> {
private final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized;
private final BiFunction<V, V, Boolean> valueComparator;
private long maintainDurationMs = TimeUnit.HOURS.toMillis(30);
public DedupeValueForKey(BiFunction<V, V, Boolean> valueComparator, Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
this.materialized = new MaterializedInternal<>(materialized, null, null);
// can't pass in name provider
// new MaterializedInternal<>(materialized, builder, REDUCE_NAME);
this.valueComparator = valueComparator;
}
@Override
public Transformer<K, V, KeyValue<K, V>> get() {
return new DedupeTransformer(
valueComparator,
maintainDurationMs
);
}
// provide store(s) that will be added and connected to the associated transformer
// the store name from the builder ("myTransformState") is used to access the store later via the ProcessorContext
@Override
public Set<StoreBuilder<?>> stores() {
KeyValueBytesStoreSupplier supplier = Stores.persistentTimestampedKeyValueStore(storeName);
StoreBuilder<TimestampedKeyValueStore<K, V>> builder = Stores.timestampedKeyValueStoreBuilder(
supplier,
materialized.keySerde(),
materialized.valueSerde());
return Collections.singleton(builder);
}
public void setMaintainDurationMs(long maintainDurationMs) {
this.maintainDurationMs = maintainDurationMs;
}
}
@Test
public void shouldRemoveDuplicatesFromTheInput() {
final String firstId = UUID.randomUUID().toString(); // e.g. "4ff3cb44-abcb-46e3-8f9a-afb7cc74fbb8"
final String secondId = UUID.randomUUID().toString();
final String thirdId = UUID.randomUUID().toString();
final byte[] keyId = UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8);
final List<String> inputValues = Arrays.asList(firstId, secondId, secondId, thirdId,
thirdId, thirdId);
final List<KeyValue<byte[], String>> inputs = inputValues.stream().map(v -> new KeyValue<>(keyId, v)).collect(Collectors.toList());
final List<String> expectedValues = Arrays.asList(firstId, secondId, thirdId);
//
// Step 1: Configure and start the processor topology.
//
final StreamsBuilder builder = new StreamsBuilder();
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "deduplication-lambda-integration-test");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy config");
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// Use a temporary directory for storing state, which will be automatically removed after the test.
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
// How long we "remember" an event. During this time, any incoming duplicates of the event
// will be, well, dropped, thereby de-duplicating the input data.
//
// The actual value depends on your use case. To reduce memory and disk usage, you could
// decrease the size to purge old windows more frequently at the cost of potentially missing out
// on de-duplicating late-arriving records.
final Duration windowSize = Duration.ofMinutes(10);
// retention period must be at least window size -- for this use case, we don't need a longer retention period
// and thus just use the window size as retention time
final Duration retentionPeriod = windowSize;
final String inputTopic = "inputTopic";
final String outputTopic = "outputTopic";
final KStream<byte[], String> stream = builder.stream(inputTopic);
DedupeValueForKey transformerSupplier = new DedupeValueForKey(
(value1, value2) -> value1.equals(value2),
Materialized.with(new ByteArraySerde(), new Serdes.StringSerde())
);
transformerSupplier.setMaintainDurationMs(retentionPeriod.toMillis());
final KStream<byte[], String> deduplicated = stream.transform(
// In this example, we assume that the record value as-is represents a unique event ID by
// which we can perform de-duplication. If your records are different, adapt the comparison
// function as needed
transformerSupplier
);
deduplicated.to(outputTopic);
try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(builder.build(), streamsConfiguration)) {
//
// Step 2: Setup input and output topics.
//
final TestInputTopic<byte[], String> input = topologyTestDriver
.createInputTopic(inputTopic,
new ByteArraySerializer(),
new StringSerializer());
final TestOutputTopic<byte[], String> output = topologyTestDriver
.createOutputTopic(outputTopic,
new ByteArrayDeserializer(),
new StringDeserializer());
//
// Step 3: Produce some input data to the input topic.
//
input.pipeKeyValueList(inputs);
//
// Step 4: Verify the application's output data.
//
assertThat(output.readValuesToList(), equalTo(expectedValues));
}
}
@Test
public void shouldForgetAfterExpiry() {
final String firstId = UUID.randomUUID().toString(); // e.g. "4ff3cb44-abcb-46e3-8f9a-afb7cc74fbb8"
final String secondId = UUID.randomUUID().toString();
final String thirdId = UUID.randomUUID().toString();
final byte[] keyId = UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8);
final List<String> inputValues = Arrays.asList(firstId, secondId, secondId, thirdId,
thirdId, thirdId);
final List<KeyValue<byte[], String>> inputs = inputValues.stream().map(v -> new KeyValue<>(keyId, v)).collect(Collectors.toList());
final List<String> expectedValues = Arrays.asList(firstId, secondId, thirdId, thirdId);
//
// Step 1: Configure and start the processor topology.
//
final StreamsBuilder builder = new StreamsBuilder();
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "deduplication-lambda-integration-test");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy config");
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// Use a temporary directory for storing state, which will be automatically removed after the test.
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
// How long we "remember" an event. During this time, any incoming duplicates of the event
// will be, well, dropped, thereby de-duplicating the input data.
//
// The actual value depends on your use case. To reduce memory and disk usage, you could
// decrease the size to purge old windows more frequently at the cost of potentially missing out
// on de-duplicating late-arriving records.
final Duration windowSize = Duration.ofMinutes(10);
// retention period must be at least window size -- for this use case, we don't need a longer retention period
// and thus just use the window size as retention time
final Duration retentionPeriod = windowSize;
final String inputTopic = "inputTopic";
final String outputTopic = "outputTopic";
final KStream<byte[], String> stream = builder.stream(inputTopic);
DedupeValueForKey transformerSupplier = new DedupeValueForKey(
(value1, value2) -> value1.equals(value2),
Materialized.with(new ByteArraySerde(), new Serdes.StringSerde())
);
transformerSupplier.setMaintainDurationMs(retentionPeriod.toMillis());
final KStream<byte[], String> deduplicated = stream.transform(
// In this example, we assume that the record value as-is represents a unique event ID by
// which we can perform de-duplication. If your records are different, adapt the comparison
// function as needed
transformerSupplier
);
deduplicated.to(outputTopic);
try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(builder.build(), streamsConfiguration)) {
//
// Step 2: Setup input and output topics.
//
final TestInputTopic<byte[], String> input = topologyTestDriver
.createInputTopic(inputTopic,
new ByteArraySerializer(),
new StringSerializer());
final TestOutputTopic<byte[], String> output = topologyTestDriver
.createOutputTopic(outputTopic,
new ByteArrayDeserializer(),
new StringDeserializer());
//
// Step 3: Produce some input data to the input topic.
//
input.pipeKeyValueList(inputs);
//
// Step 4: Advance wall clock time
//
topologyTestDriver.advanceWallClockTime(windowSize.plus(Duration.ofMillis(DedupeTransformer.CLEAR_INTERVAL_MILLIS)));
//
// Step 5: Send another
//
input.pipeInput(keyId, thirdId);
//
// Step 6: Verify the application's output data.
//
assertThat(output.readValuesToList(), equalTo(expectedValues));
}
}
}
/*
* Copyright Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.confluent.examples.streams;
import org.apache.kafka.common.serialization.*;
import org.apache.kafka.common.serialization.Serdes.ByteArraySerde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.state.*;
import org.apache.kafka.test.TestUtils;
import org.junit.Test;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
/**
* End-to-end integration test that demonstrates how to remove duplicate records from an input
* stream.
* <p>
* Here, a stateful {@link Transformer} (from the Processor API)
* detects and discards duplicate input records based on an "event id" that is embedded in each
* input record. This transformer is then included in a topology defined via the DSL.
* <p>
* In this simplified example, the values of input records represent the event ID by which
* duplicates will be detected. In practice, record values would typically be a more complex data
* structure, with perhaps one of the fields being such an event ID. De-duplication by an event ID
* is but one example of how to perform de-duplication in general. The code example below can be
* adapted to other de-duplication approaches.
* <p>
* Note: This example uses lambda expressions and thus works with Java 8+ only.
*/
public class DeduplicationValueTransformerIntegrationTest {
private static final String storeName = "eventId-store";
/**
* Discards duplicate records from the input stream. Uses a ValueTransformerWithKey instead of Transformer
* to remove the need for re-partitioning in the KStreams DSL due to a possible key change.
* <p>
* Duplicates are determined by a function passed to it on creation to allow for custom comparisons, that compares
* new values with the previous.
* <p>
* Duplicate records are detected based on an value. The transformer remembers the last value for each key in a state
* store, which automatically purges/expires values from the store after a certain amount of
* time has passed to prevent the store from growing indefinitely. This assumes the consumer can tolerate some duplicates.
* This processor's purpose is to reduce the 'noise' caused by a lot of duplicate messages.
* <p>
* Note: This code is for demonstration purposes and was not tested for production usage.
*/
private static class DedupeValueTransformer<K, V> implements ValueTransformerWithKey<K, V, Iterable<V>> {
private ProcessorContext context;
/**
* Key: event ID
* Value: timestamp (event-time) of the corresponding event when the event ID was seen for the
* first time
*/
private TimestampedKeyValueStore<K, V> eventStore;
public static final int CLEAR_INTERVAL_MILLIS = 1000;
private final long maintainDurationMs;
private final BiFunction<V, V, Boolean> valueComparator;
/**
*/
DedupeValueTransformer(BiFunction<V, V, Boolean> valueComparator, long maintainDurationMs) {
this.valueComparator = valueComparator;
this.maintainDurationMs = maintainDurationMs;
}
@Override
@SuppressWarnings("unchecked")
public void init(final ProcessorContext context) {
this.context = context;
eventStore = (TimestampedKeyValueStore<K, V>) context.getStateStore(storeName);
this.context.schedule(Duration.ofMillis(CLEAR_INTERVAL_MILLIS), PunctuationType.WALL_CLOCK_TIME, this::clearExpired);
}
private void clearExpired(long currentStreamTimeMs) {
try (KeyValueIterator<K, ValueAndTimestamp<V>> iterator = eventStore.all()) {
while (iterator.hasNext()) {
final KeyValue<K, ValueAndTimestamp<V>> entry = iterator.next();
final long eventTimestamp = entry.value.timestamp();
if (hasExpired(eventTimestamp, currentStreamTimeMs)) {
eventStore.delete(entry.key);
}
}
}
}
private boolean hasExpired(final long eventTimestamp, final long currentStreamTimeMs) {
return (currentStreamTimeMs - eventTimestamp) > maintainDurationMs;
}
@Override
public Iterable<V> transform(K key, V value) {
if (value == null) {
return Collections.emptyList();
} else {
final Iterable<V> output;
ValueAndTimestamp<V> storedValue = eventStore.get(key);
if (context.timestamp() < storedValue.timestamp()) {
// message is older than last stored, ignoring
output = Collections.emptyList();
} else if (isDuplicate(key, value, storedValue)) {
output = Collections.emptyList();
updateTimestampOfExistingEventToPreventExpiry(key, storedValue.value(), context.timestamp());
} else {
output = Collections.singleton(value);
rememberNewEvent(key, value, context.timestamp());
}
return output;
}
}
private boolean isDuplicate(final K key, final V value, ValueAndTimestamp<V> storedValue) {
boolean isDuplicate = false;
if (storedValue != null) {
V previous = storedValue.value();
isDuplicate = (valueComparator.apply(previous, value) == Boolean.TRUE);
}
return isDuplicate;
}
private void updateTimestampOfExistingEventToPreventExpiry(final K key, final V value, final long newTimestamp) {
eventStore.put(key, ValueAndTimestamp.make(value, newTimestamp));
}
private void rememberNewEvent(final K key, final V value, final long timestamp) {
eventStore.put(key, ValueAndTimestamp.make(value, timestamp));
}
@Override
public void close() {
// Note: The store should NOT be closed manually here via `eventStore.close()`!
// The Kafka Streams API will automatically close stores when necessary.
}
}
private class DedupeValueWithKey<K, V> implements ValueTransformerWithKeySupplier<K, V, Iterable<V>> {
private final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized;
private final BiFunction<V, V, Boolean> valueComparator;
private long maintainDurationMs = TimeUnit.HOURS.toMillis(30);
public DedupeValueWithKey(BiFunction<V, V, Boolean> valueComparator, Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
this.materialized = new MaterializedInternal<>(materialized, null, null);
// can't pass in name provider
// new MaterializedInternal<>(materialized, builder, REDUCE_NAME);
this.valueComparator = valueComparator;
}
@Override
public ValueTransformerWithKey<K, V, Iterable<V>> get() {
return new DedupeValueTransformer(
valueComparator,
maintainDurationMs
);
}
// provide store(s) that will be added and connected to the associated transformer
// the store name from the builder ("myTransformState") is used to access the store later via the ProcessorContext
@Override
public Set<StoreBuilder<?>> stores() {
KeyValueBytesStoreSupplier supplier = Stores.persistentTimestampedKeyValueStore(storeName);
StoreBuilder<TimestampedKeyValueStore<K, V>> builder = Stores.timestampedKeyValueStoreBuilder(
supplier,
materialized.keySerde(),
materialized.valueSerde());
return Collections.singleton(builder);
}
public void setMaintainDurationMs(long maintainDurationMs) {
this.maintainDurationMs = maintainDurationMs;
}
}
@Test
public void shouldRemoveDuplicatesFromTheInput() {
final String firstId = UUID.randomUUID().toString(); // e.g. "4ff3cb44-abcb-46e3-8f9a-afb7cc74fbb8"
final String secondId = UUID.randomUUID().toString();
final String thirdId = UUID.randomUUID().toString();
final byte[] keyId = UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8);
final List<String> inputValues = Arrays.asList(firstId, secondId, secondId, thirdId,
thirdId, thirdId);
final List<KeyValue<byte[], String>> inputs = inputValues.stream().map(v -> new KeyValue<>(keyId, v)).collect(Collectors.toList());
final List<String> expectedValues = Arrays.asList(firstId, secondId, thirdId);
//
// Step 1: Configure and start the processor topology.
//
final StreamsBuilder builder = new StreamsBuilder();
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "deduplication-lambda-integration-test");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy config");
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// Use a temporary directory for storing state, which will be automatically removed after the test.
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
// How long we "remember" an event. During this time, any incoming duplicates of the event
// will be, well, dropped, thereby de-duplicating the input data.
//
// The actual value depends on your use case. To reduce memory and disk usage, you could
// decrease the size to purge old windows more frequently at the cost of potentially missing out
// on de-duplicating late-arriving records.
final Duration windowSize = Duration.ofMinutes(10);
// retention period must be at least window size -- for this use case, we don't need a longer retention period
// and thus just use the window size as retention time
final Duration retentionPeriod = windowSize;
final String inputTopic = "inputTopic";
final String outputTopic = "outputTopic";
final KStream<byte[], String> stream = builder.stream(inputTopic);
DedupeValueWithKey transformerSupplier = new DedupeValueWithKey(
(value1, value2) -> value1.equals(value2),
Materialized.with(new ByteArraySerde(), new Serdes.StringSerde())
);
transformerSupplier.setMaintainDurationMs(retentionPeriod.toMillis());
final KStream<byte[], String> deduplicated = stream.flatTransformValues(
// In this example, we assume that the record value as-is represents a unique event ID by
// which we can perform de-duplication. If your records are different, adapt the comparison
// function as needed
transformerSupplier
);
deduplicated.to(outputTopic);
try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(builder.build(), streamsConfiguration)) {
//
// Step 2: Setup input and output topics.
//
final TestInputTopic<byte[], String> input = topologyTestDriver
.createInputTopic(inputTopic,
new ByteArraySerializer(),
new StringSerializer());
final TestOutputTopic<byte[], String> output = topologyTestDriver
.createOutputTopic(outputTopic,
new ByteArrayDeserializer(),
new StringDeserializer());
//
// Step 3: Produce some input data to the input topic.
//
input.pipeKeyValueList(inputs);
//
// Step 4: Verify the application's output data.
//
assertThat(output.readValuesToList(), equalTo(expectedValues));
}
}
@Test
public void shouldForgetAfterExpiry() {
final String firstId = UUID.randomUUID().toString(); // e.g. "4ff3cb44-abcb-46e3-8f9a-afb7cc74fbb8"
final String secondId = UUID.randomUUID().toString();
final String thirdId = UUID.randomUUID().toString();
final byte[] keyId = UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8);
final List<String> inputValues = Arrays.asList(firstId, secondId, secondId, thirdId,
thirdId, thirdId);
final List<KeyValue<byte[], String>> inputs = inputValues.stream().map(v -> new KeyValue<>(keyId, v)).collect(Collectors.toList());
final List<String> expectedValues = Arrays.asList(firstId, secondId, thirdId, thirdId);
//
// Step 1: Configure and start the processor topology.
//
final StreamsBuilder builder = new StreamsBuilder();
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "deduplication-lambda-integration-test");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy config");
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// Use a temporary directory for storing state, which will be automatically removed after the test.
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
// How long we "remember" an event. During this time, any incoming duplicates of the event
// will be, well, dropped, thereby de-duplicating the input data.
//
// The actual value depends on your use case. To reduce memory and disk usage, you could
// decrease the size to purge old windows more frequently at the cost of potentially missing out
// on de-duplicating late-arriving records.
final Duration windowSize = Duration.ofMinutes(10);
// retention period must be at least window size -- for this use case, we don't need a longer retention period
// and thus just use the window size as retention time
final Duration retentionPeriod = windowSize;
final String inputTopic = "inputTopic";
final String outputTopic = "outputTopic";
final KStream<byte[], String> stream = builder.stream(inputTopic);
DedupeValueWithKey transformerSupplier = new DedupeValueWithKey(
(value1, value2) -> value1.equals(value2),
Materialized.with(new ByteArraySerde(), new Serdes.StringSerde())
);
transformerSupplier.setMaintainDurationMs(retentionPeriod.toMillis());
final KStream<byte[], String> deduplicated = stream.flatTransformValues(
// In this example, we assume that the record value as-is represents a unique event ID by
// which we can perform de-duplication. If your records are different, adapt the comparison
// function as needed
transformerSupplier
);
deduplicated.to(outputTopic);
try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(builder.build(), streamsConfiguration)) {
//
// Step 2: Setup input and output topics.
//
final TestInputTopic<byte[], String> input = topologyTestDriver
.createInputTopic(inputTopic,
new ByteArraySerializer(),
new StringSerializer());
final TestOutputTopic<byte[], String> output = topologyTestDriver
.createOutputTopic(outputTopic,
new ByteArrayDeserializer(),
new StringDeserializer());
//
// Step 3: Produce some input data to the input topic.
//
input.pipeKeyValueList(inputs);
//
// Step 4: Advance wall clock time
//
topologyTestDriver.advanceWallClockTime(windowSize.plus(Duration.ofMillis(DedupeValueTransformer.CLEAR_INTERVAL_MILLIS)));
//
// Step 5: Send another
//
input.pipeInput(keyId, thirdId);
//
// Step 6: Verify the application's output data.
//
assertThat(output.readValuesToList(), equalTo(expectedValues));
}
}
}
/*
* Copyright Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.confluent.examples.streams;
import org.apache.kafka.common.serialization.*;
import org.apache.kafka.common.serialization.Serdes.ByteArraySerde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.*;
import org.apache.kafka.test.TestUtils;
import org.junit.Test;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.*;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
/**
* End-to-end integration test that demonstrates how to remove duplicate records from an input
* stream.
* <p>
* Here, a stateful {@link org.apache.kafka.streams.kstream.Transformer} (from the Processor API)
* detects and discards duplicate input records based on an "event id" that is embedded in each
* input record. This transformer is then included in a topology defined via the DSL.
* <p>
* In this simplified example, the values of input records represent the event ID by which
* duplicates will be detected. In practice, record values would typically be a more complex data
* structure, with perhaps one of the fields being such an event ID. De-duplication by an event ID
* is but one example of how to perform de-duplication in general. The code example below can be
* adapted to other de-duplication approaches.
* <p>
* Note: This example uses lambda expressions and thus works with Java 8+ only.
*/
public class EventDeduplicationLambdaIntegrationTest {
private static final String storeName = "eventId-store";
/**
* Discards duplicate records from the input stream.
* <p>
* Duplicate records are detected based on an event ID; in this simplified example, the record
* value is the event ID. The transformer remembers known event IDs in an associated window state
* store, which automatically purges/expires event IDs from the store after a certain amount of
* time has passed to prevent the store from growing indefinitely.
* <p>
* Note: This code is for demonstration purposes and was not tested for production usage.
*/
private static class DedupeTransformer<K, V> implements Transformer<K, V, KeyValue<K, V>> {
private ProcessorContext context;
/**
* Key: event ID
* Value: timestamp (event-time) of the corresponding event when the event ID was seen for the
* first time
*/
private WindowStore<K, V> eventStore;
private final long leftDurationMs;
private final long rightDurationMs;
private final BiFunction<V, V, Boolean> valueComparator;
/**2
* @param maintainDurationPerEventInMs how long to "remember" a known event (or rather, an event
* ID), during the time of which any incoming duplicates of
* the event will be dropped, thereby de-duplicating the
* input.
* @param valueComparator compares the value of previous record to current
*/
DedupeTransformer(BiFunction<V, V, Boolean> valueComparator, final long maintainDurationPerEventInMs) {
if (maintainDurationPerEventInMs < 1) {
throw new IllegalArgumentException("maintain duration per event must be >= 1");
}
leftDurationMs = maintainDurationPerEventInMs / 2;
rightDurationMs = maintainDurationPerEventInMs - leftDurationMs;
this.valueComparator = valueComparator;
}
@Override
@SuppressWarnings("unchecked")
public void init(final ProcessorContext context) {
this.context = context;
eventStore = (WindowStore<K, V>) context.getStateStore(storeName);
}
public KeyValue<K, V> transform(final K key, final V value) {
if (value == null) {
return KeyValue.pair(key, null);
} else {
final KeyValue<K, V> output;
if (isDuplicate(key, value)) {
output = null;
updateTimestampOfExistingEventToPreventExpiry(key, value, context.timestamp());
} else {
output = KeyValue.pair(key, value);
rememberNewEvent(key, value, context.timestamp());
}
return output;
}
}
private boolean isDuplicate(final K key, final V value) {
boolean isDuplicate = false;
final long eventTime = context.timestamp();
final WindowStoreIterator<V> timeIterator = eventStore.fetch(
key,
eventTime - leftDurationMs,
eventTime + rightDurationMs);
if (!timeIterator.hasNext()) {
return false;
}
KeyValue<Long, V> storedValue = timeIterator.next();
if (storedValue != null) {
V previous = storedValue.value;
isDuplicate = (valueComparator.apply(previous, value) == Boolean.TRUE);
}
return isDuplicate;
}
private void updateTimestampOfExistingEventToPreventExpiry(final K key, final V value, final long newTimestamp) {
eventStore.put(key, value, newTimestamp);
}
private void rememberNewEvent(final K key, final V value, final long timestamp) {
eventStore.put(key, value, timestamp);
}
@Override
public void close() {
// Note: The store should NOT be closed manually here via `eventStore.close()`!
// The Kafka Streams API will automatically close stores when necessary.
}
}
private class DedupeValueForKey<K, V> implements TransformerSupplier<K, V, KeyValue<K, V>> {
private final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized;
private final BiFunction<V, V, Boolean> valueComparator;
private final Duration retentionPeriod;
public DedupeValueForKey(BiFunction<V, V, Boolean> valueComparator, Duration retentionPeriod, Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
this.materialized = new MaterializedInternal<>(materialized, null, null);
// can't pass in name provider
// new MaterializedInternal<>(materialized, builder, REDUCE_NAME);
this.valueComparator = valueComparator;
this.retentionPeriod = retentionPeriod;
}
@Override
public Transformer<K, V, KeyValue<K, V>> get() {
return new DedupeTransformer(
valueComparator,
retentionPeriod.toMillis()
);
}
// provide store(s) that will be added and connected to the associated transformer
@Override
public Set<StoreBuilder<?>> stores() {
WindowBytesStoreSupplier supplier = Stores.persistentWindowStore(storeName,
retentionPeriod,
retentionPeriod,
false
);
StoreBuilder<WindowStore<K, V>> builder = Stores.windowStoreBuilder(
supplier,
materialized.keySerde(),
materialized.valueSerde());
return Collections.singleton(builder);
}
}
@Test
public void shouldRemoveDuplicatesFromTheInput() {
final String firstId = UUID.randomUUID().toString(); // e.g. "4ff3cb44-abcb-46e3-8f9a-afb7cc74fbb8"
final String secondId = UUID.randomUUID().toString();
final String thirdId = UUID.randomUUID().toString();
final byte[] keyId = UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8);
final List<String> inputValues = Arrays.asList(firstId, secondId, secondId, thirdId,
thirdId, thirdId);
final List<KeyValue<byte[], String>> inputs = inputValues.stream().map(v -> new KeyValue<>(keyId, v)).collect(Collectors.toList());
final List<String> expectedValues = Arrays.asList(firstId, secondId, thirdId);
//
// Step 1: Configure and start the processor topology.
//
final StreamsBuilder builder = new StreamsBuilder();
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "deduplication-lambda-integration-test");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy config");
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// Use a temporary directory for storing state, which will be automatically removed after the test.
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
// How long we "remember" an event. During this time, any incoming duplicates of the event
// will be, well, dropped, thereby de-duplicating the input data.
//
// The actual value depends on your use case. To reduce memory and disk usage, you could
// decrease the size to purge old windows more frequently at the cost of potentially missing out
// on de-duplicating late-arriving records.
final Duration windowSize = Duration.ofMinutes(10);
// retention period must be at least window size -- for this use case, we don't need a longer retention period
// and thus just use the window size as retention time
final Duration retentionPeriod = windowSize;
final String inputTopic = "inputTopic";
final String outputTopic = "outputTopic";
final KStream<byte[], String> stream = builder.stream(inputTopic);
final KStream<byte[], String> deduplicated = stream.transform(
// In this example, we assume that the record value as-is represents a unique event ID by
// which we can perform de-duplication. If your records are different, adapt the comparison
// function as needed
new DedupeValueForKey(
(value1, value2) -> value1.equals(value2),
retentionPeriod,
Materialized.with(new ByteArraySerde(), new Serdes.StringSerde())
)
);
deduplicated.to(outputTopic);
try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(builder.build(), streamsConfiguration)) {
//
// Step 2: Setup input and output topics.
//
final TestInputTopic<byte[], String> input = topologyTestDriver
.createInputTopic(inputTopic,
new ByteArraySerializer(),
new StringSerializer());
final TestOutputTopic<byte[], String> output = topologyTestDriver
.createOutputTopic(outputTopic,
new ByteArrayDeserializer(),
new StringDeserializer());
//
// Step 3: Produce some input data to the input topic.
//
input.pipeKeyValueList(inputs);
//
// Step 4: Verify the application's output data.
//
assertThat(output.readValuesToList(), equalTo(expectedValues));
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment