Last active
October 13, 2022 08:30
-
-
Save jonathansantilli/3b69ebbcd24e7a30f66db790ef648f99 to your computer and use it in GitHub Desktop.
Show basic examples and comparison between a Transactional Producer and a non Transactional Producer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.Properties; | |
import org.apache.kafka.clients.producer.KafkaProducer; | |
import org.apache.kafka.clients.producer.ProducerRecord; | |
public class KafkaProducerScenariosExample { | |
public static void main(String[] args) { | |
/* | |
* Non Transactional Producer (successful case) | |
* | |
* This example will create a non transactional producer and send 100 records to the Kafka Brokers. | |
* */ | |
Properties props = new Properties(); | |
props.put("bootstrap.servers", "localhost:9092"); | |
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); | |
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); | |
KafkaProducer<String, String> producer = new KafkaProducer<>(props); | |
for (int i = 0; i < 100; i++) { | |
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i))); | |
} | |
producer.close(); | |
/* | |
* Transactional Producer (successful case) | |
* | |
* This example will create a transactional producer and send 100 records to the Kafka Brokers. | |
* | |
* Look at props.put("transactional.id", "my-transactional-id"); | |
**/ | |
Properties props = new Properties(); | |
props.put("bootstrap.servers", "localhost:9092"); | |
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); | |
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); | |
props.put("transactional.id", "my-transactional-id"); // This ID must be unique across the diff producers | |
KafkaProducer<String, String> producer = new KafkaProducer<>(props); | |
producer.initTransactions(); | |
producer.beginTransaction(); | |
for (int i = 0; i < 100; i++) { | |
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i))); | |
} | |
producer.commitTransaction(); | |
producer.close(); | |
/* | |
* Transactional Producer (unsuccessful case) | |
* | |
* This example will create a transactional producer (look at props.put("transactional.id", "my-transactional-id");) | |
* | |
* ...and try to send 100 records to the Kafka Brokers without calling the methods: | |
* | |
* KafkaProducer#initTransactions() | |
* KafkaProducer#.beginTransaction() | |
* KafkaProducer#.commitTransaction() | |
* | |
* */ | |
Properties props = new Properties(); | |
props.put("bootstrap.servers", "localhost:9092"); | |
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); | |
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); | |
props.put("transactional.id", "my-transactional-id"); // This ID must be unique across the diff producers | |
KafkaProducer<String, String> producer = new KafkaProducer<>(props); | |
for (int i = 0; i < 100; i++) { | |
// At this point, the method KafkaProducer#send() will throw: | |
// Exception in thread "main" java.lang.IllegalStateException: Cannot perform a 'send' before completing a call to initTransactions when transactions are enabled. | |
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i))); | |
} | |
producer.close(); | |
/* | |
* Trying to create a Transactional Producer without setting the transactional.id (unsuccessful case) | |
* | |
**/ | |
Properties props = new Properties(); | |
props.put("bootstrap.servers", "localhost:9092"); | |
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); | |
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); | |
KafkaProducer<String, String> producer = new KafkaProducer<>(props); | |
// At this point it will throw a: | |
// Exception in thread "main" java.lang.IllegalStateException: Cannot use transactional methods without enabling transactions by setting the transactional.id configuration property | |
producer.initTransactions(); | |
producer.beginTransaction(); | |
for (int i = 0; i < 100; i++) { | |
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i))); | |
} | |
producer.commitTransaction(); | |
/* | |
* Trying to mix Transactional Producer with non Transactional (unsuccessful case) | |
* | |
* This example will create a transactional producer and send 100 records to the Kafka Brokers. | |
* Then it will try to send 1 records without calling the KafkaProducer#beginTransaction() first. | |
**/ | |
Properties props = new Properties(); | |
props.put("bootstrap.servers", "localhost:9092"); | |
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); | |
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); | |
props.put("transactional.id", "my-transactional-id"); // This ID must be unique across the diff producers | |
KafkaProducer<String, String> producer = new KafkaProducer<>(props); | |
producer.initTransactions(); | |
producer.beginTransaction(); | |
for (int i = 0; i < 100; i++) { | |
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i))); | |
} | |
producer.commitTransaction(); | |
// At this point, trying to send a record outside a transaction, the KafkaProducer#send() method will throw: | |
// | |
// Exception in thread "main" java.lang.IllegalStateException: Cannot call send in state READY | |
// | |
// Demonstrating that we can not mix the behaviour of transactional producers instances | |
// with the behaviour of a non transactional producer. | |
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(1), Integer.toString(1))); | |
producer.close(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment