Skip to content

Instantly share code, notes, and snippets.

@rodesai
Created October 8, 2024 21:05
Show Gist options
  • Save rodesai/732707d3b158eebe9949ea8952e2b884 to your computer and use it in GitHub Desktop.
Save rodesai/732707d3b158eebe9949ea8952e2b884 to your computer and use it in GitHub Desktop.
package dev.responsive.demo;
import static dev.responsive.kafka.api.async.AsyncFixedKeyProcessorSupplier.createAsyncProcessorSupplier;
import dev.responsive.kafka.api.ResponsiveKafkaStreams;
import dev.responsive.kafka.api.stores.ResponsiveKeyValueParams;
import dev.responsive.kafka.api.stores.ResponsiveStores;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.api.FixedKeyProcessor;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
import org.apache.kafka.streams.processor.api.FixedKeyRecord;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DedupDemo {
private static final Logger LOG = LoggerFactory.getLogger(Main.class);
private static final String APPLICATION_ID = "test-app";
private static final String DEDUP_FACT_STORE = APPLICATION_ID + "-dedup-fact-store";
public static final String DEDUP_INPUT_STREAM = APPLICATION_ID + "-dedup-input";
public static final String DEDUP_OUTPUT = APPLICATION_ID + "-dedup-output";
public static void main(final String[] args) throws Exception {
final Properties props = loadConfig();
try {
final Topology topology;
topology = buildDeduplicatorTopology();
LOG.info("Built Kafka Streams topology:\n{}", topology.describe().toString());
final KafkaStreams streams = new ResponsiveKafkaStreams(topology, props);
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
LOG.info("Starting Responsive Kafka Streams");
streams.start();
} catch (final Exception e) {
LOG.error("Shutting down application early due to error", e);
System.err.println("Exiting JVM due to " + e.getMessage());
System.exit(1);
}
}
static Topology buildDeduplicatorTopology() {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> inputStream = builder.stream(DEDUP_INPUT_STREAM);
final var deduplicatorProcessorSupplier = new DeduplicatorProcessorSupplier();
final KStream<String, String> deduplicatedStream = inputStream
.processValues(
createAsyncProcessorSupplier(deduplicatorProcessorSupplier),
DEDUP_FACT_STORE
);
deduplicatedStream.to(DEDUP_OUTPUT);
return builder.build();
}
public static class DeduplicatorProcessorSupplier
implements FixedKeyProcessorSupplier<String, String, String> {
@Override
public FixedKeyProcessor<String, String, String> get() {
return new FixedKeyProcessor<>() {
private FixedKeyProcessorContext<String, String> context;
private KeyValueStore<String, String> factStore;
@Override
public void init(final FixedKeyProcessorContext context) {
this.context = context;
factStore = context.getStateStore(DEDUP_FACT_STORE);
}
@Override
public void process(final FixedKeyRecord<String, String> record) {
final String existingValue = factStore.putIfAbsent(record.key(), record.value());
if (existingValue == null) {
context.forward(record);
}
}
};
}
@Override
public Set<StoreBuilder<?>> stores() {
return Collections.singleton(ResponsiveStores.keyValueStoreBuilder(
ResponsiveStores.keyValueStore(ResponsiveKeyValueParams.fact((DEDUP_FACT_STORE))),
Serdes.String(),
Serdes.String()
));
}
}
static Properties loadConfig() throws IOException {
final Properties cfg = new Properties();
try (InputStream inputStream = Main.class.getResourceAsStream("/app.properties")) {
cfg.load(inputStream);
}
return cfg;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment