Last active
April 26, 2023 09:59
-
-
Save apurvam/db694549dd149aca1bd6c67ed835f4ea to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
KafkaProducer producer = createKafkaProducer( | |
“bootstrap.servers”, “localhost:9092”, | |
“transactional.id”, “my-transactional-id”); | |
producer.initTransactions(); | |
KafkaConsumer consumer = createKafkaConsumer( | |
“bootstrap.servers”, “localhost:9092”, | |
“group.id”, “my-group-id”, | |
"isolation.level", "read_committed"); | |
consumer.subscribe(singleton(“inputTopic”)); | |
while (true) { | |
ConsumerRecords records = consumer.poll(Long.MAX_VALUE); | |
producer.beginTransaction(); | |
for (ConsumerRecord record : records) | |
producer.send(producerRecord(“outputTopic”, record)); | |
producer.sendOffsetsToTransaction(currentOffsets(consumer), group); | |
producer.commitTransaction(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Every example I see regarding transactions always seems to assume that you have a consumer in the same process for offsets?
This is exceptionally ambiguous when learning kafka. How does the producer know the correct offset if the producer is only producing events, and isn't just transforming from a consumer...?
How do you keep track of offsets when you have no idea what has been read by consumers you don't even know about, when consumers are not supposed to be committing their offsets...?