Created
October 8, 2024 21:05
-
-
Save rodesai/732707d3b158eebe9949ea8952e2b884 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package dev.responsive.demo; | |
import static dev.responsive.kafka.api.async.AsyncFixedKeyProcessorSupplier.createAsyncProcessorSupplier; | |
import dev.responsive.kafka.api.ResponsiveKafkaStreams; | |
import dev.responsive.kafka.api.stores.ResponsiveKeyValueParams; | |
import dev.responsive.kafka.api.stores.ResponsiveStores; | |
import java.io.IOException; | |
import java.io.InputStream; | |
import java.util.Collections; | |
import java.util.Properties; | |
import java.util.Set; | |
import org.apache.kafka.common.serialization.Serdes; | |
import org.apache.kafka.streams.KafkaStreams; | |
import org.apache.kafka.streams.StreamsBuilder; | |
import org.apache.kafka.streams.Topology; | |
import org.apache.kafka.streams.kstream.KStream; | |
import org.apache.kafka.streams.processor.api.FixedKeyProcessor; | |
import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext; | |
import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier; | |
import org.apache.kafka.streams.processor.api.FixedKeyRecord; | |
import org.apache.kafka.streams.state.KeyValueStore; | |
import org.apache.kafka.streams.state.StoreBuilder; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
public class DedupDemo { | |
private static final Logger LOG = LoggerFactory.getLogger(Main.class); | |
private static final String APPLICATION_ID = "test-app"; | |
private static final String DEDUP_FACT_STORE = APPLICATION_ID + "-dedup-fact-store"; | |
public static final String DEDUP_INPUT_STREAM = APPLICATION_ID + "-dedup-input"; | |
public static final String DEDUP_OUTPUT = APPLICATION_ID + "-dedup-output"; | |
public static void main(final String[] args) throws Exception { | |
final Properties props = loadConfig(); | |
try { | |
final Topology topology; | |
topology = buildDeduplicatorTopology(); | |
LOG.info("Built Kafka Streams topology:\n{}", topology.describe().toString()); | |
final KafkaStreams streams = new ResponsiveKafkaStreams(topology, props); | |
Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); | |
LOG.info("Starting Responsive Kafka Streams"); | |
streams.start(); | |
} catch (final Exception e) { | |
LOG.error("Shutting down application early due to error", e); | |
System.err.println("Exiting JVM due to " + e.getMessage()); | |
System.exit(1); | |
} | |
} | |
static Topology buildDeduplicatorTopology() { | |
final StreamsBuilder builder = new StreamsBuilder(); | |
final KStream<String, String> inputStream = builder.stream(DEDUP_INPUT_STREAM); | |
final var deduplicatorProcessorSupplier = new DeduplicatorProcessorSupplier(); | |
final KStream<String, String> deduplicatedStream = inputStream | |
.processValues( | |
createAsyncProcessorSupplier(deduplicatorProcessorSupplier), | |
DEDUP_FACT_STORE | |
); | |
deduplicatedStream.to(DEDUP_OUTPUT); | |
return builder.build(); | |
} | |
public static class DeduplicatorProcessorSupplier | |
implements FixedKeyProcessorSupplier<String, String, String> { | |
@Override | |
public FixedKeyProcessor<String, String, String> get() { | |
return new FixedKeyProcessor<>() { | |
private FixedKeyProcessorContext<String, String> context; | |
private KeyValueStore<String, String> factStore; | |
@Override | |
public void init(final FixedKeyProcessorContext context) { | |
this.context = context; | |
factStore = context.getStateStore(DEDUP_FACT_STORE); | |
} | |
@Override | |
public void process(final FixedKeyRecord<String, String> record) { | |
final String existingValue = factStore.putIfAbsent(record.key(), record.value()); | |
if (existingValue == null) { | |
context.forward(record); | |
} | |
} | |
}; | |
} | |
@Override | |
public Set<StoreBuilder<?>> stores() { | |
return Collections.singleton(ResponsiveStores.keyValueStoreBuilder( | |
ResponsiveStores.keyValueStore(ResponsiveKeyValueParams.fact((DEDUP_FACT_STORE))), | |
Serdes.String(), | |
Serdes.String() | |
)); | |
} | |
} | |
static Properties loadConfig() throws IOException { | |
final Properties cfg = new Properties(); | |
try (InputStream inputStream = Main.class.getResourceAsStream("/app.properties")) { | |
cfg.load(inputStream); | |
} | |
return cfg; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment