Skip to content

Instantly share code, notes, and snippets.

@jonathansantilli
Last active October 13, 2022 08:30
Show Gist options
  • Save jonathansantilli/3b69ebbcd24e7a30f66db790ef648f99 to your computer and use it in GitHub Desktop.
Save jonathansantilli/3b69ebbcd24e7a30f66db790ef648f99 to your computer and use it in GitHub Desktop.
Show basic examples and comparison between a Transactional Producer and a non Transactional Producer
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class KafkaProducerScenariosExample {
public static void main(String[] args) {
/*
* Non Transactional Producer (successful case)
*
* This example will create a non transactional producer and send 100 records to the Kafka Brokers.
* */
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
}
producer.close();
/*
* Transactional Producer (successful case)
*
* This example will create a transactional producer and send 100 records to the Kafka Brokers.
*
* Look at props.put("transactional.id", "my-transactional-id");
**/
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("transactional.id", "my-transactional-id"); // This ID must be unique across the diff producers
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
producer.beginTransaction();
for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
}
producer.commitTransaction();
producer.close();
/*
* Transactional Producer (unsuccessful case)
*
* This example will create a transactional producer (look at props.put("transactional.id", "my-transactional-id");)
*
* ...and try to send 100 records to the Kafka Brokers without calling the methods:
*
* KafkaProducer#initTransactions()
* KafkaProducer#.beginTransaction()
* KafkaProducer#.commitTransaction()
*
* */
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("transactional.id", "my-transactional-id"); // This ID must be unique across the diff producers
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++) {
// At this point, the method KafkaProducer#send() will throw:
// Exception in thread "main" java.lang.IllegalStateException: Cannot perform a 'send' before completing a call to initTransactions when transactions are enabled.
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
}
producer.close();
/*
* Trying to create a Transactional Producer without setting the transactional.id (unsuccessful case)
*
**/
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// At this point it will throw a:
// Exception in thread "main" java.lang.IllegalStateException: Cannot use transactional methods without enabling transactions by setting the transactional.id configuration property
producer.initTransactions();
producer.beginTransaction();
for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
}
producer.commitTransaction();
/*
* Trying to mix Transactional Producer with non Transactional (unsuccessful case)
*
* This example will create a transactional producer and send 100 records to the Kafka Brokers.
* Then it will try to send 1 records without calling the KafkaProducer#beginTransaction() first.
**/
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("transactional.id", "my-transactional-id"); // This ID must be unique across the diff producers
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
producer.beginTransaction();
for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
}
producer.commitTransaction();
// At this point, trying to send a record outside a transaction, the KafkaProducer#send() method will throw:
//
// Exception in thread "main" java.lang.IllegalStateException: Cannot call send in state READY
//
// Demonstrating that we can not mix the behaviour of transactional producers instances
// with the behaviour of a non transactional producer.
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(1), Integer.toString(1)));
producer.close();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment