Skip to content

Instantly share code, notes, and snippets.

@apurvam
Last active April 26, 2023 09:59
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save apurvam/db694549dd149aca1bd6c67ed835f4ea to your computer and use it in GitHub Desktop.
Save apurvam/db694549dd149aca1bd6c67ed835f4ea to your computer and use it in GitHub Desktop.
KafkaProducer producer = createKafkaProducer(
bootstrap.servers”, “localhost:9092”,
transactional.id”, “my-transactional-id”);
producer.initTransactions();
KafkaConsumer consumer = createKafkaConsumer(
bootstrap.servers”, “localhost:9092”,
group.id”, “my-group-id”,
"isolation.level", "read_committed");
consumer.subscribe(singleton(“inputTopic”));
while (true) {
ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
producer.beginTransaction();
for (ConsumerRecord record : records)
producer.send(producerRecord(“outputTopic”, record));
producer.sendOffsetsToTransaction(currentOffsets(consumer), group);
producer.commitTransaction();
}
@douglasg14b
Copy link

douglasg14b commented Sep 1, 2022

Every example I see regarding transactions always seems to assume that you have a consumer in the same process for offsets?

This is exceptionally ambiguous when learning kafka. How does the producer know the correct offset if the producer is only producing events, and isn't just transforming from a consumer...?

How do you keep track of offsets when you have no idea what has been read by consumers you don't even know about, when consumers are not supposed to be committing their offsets...?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment