Skip to content

Instantly share code, notes, and snippets.

View nehanarkhede's full-sized avatar

Neha Narkhede nehanarkhede

View GitHub Profile
@nehanarkhede
nehanarkhede / kafka-transactions.java
Last active March 13, 2022 10:06
Exactly-once guarantees and transactions in Apache Kafka
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
} catch(ProducerFencedException e) {
producer.close();
} catch(KafkaException e) {
producer.abortTransaction();
@nehanarkhede
nehanarkhede / word-count-kafka-streams.java
Last active August 25, 2020 09:14
Word count program using Kafka Streams
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> textLines = builder.stream(stringSerde, stringSerde,"TextLinesTopic");
Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS);
KStream<String, Long> wordCounts = textLines
.flatMapValues(value-> Arrays.asList(pattern.split(value.toLowerCase())))
.map((key, word) -> new KeyValue<>(word, word))
.countByKey("Counts")
.toStream();
wordCounts.to(stringSerde, longSerde, "WordsWithCountsTopic");
KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);