Created
July 13, 2020 17:48
-
-
Save jesusjavierdediego/22a24f016814d7bd6bbfb89936297cf6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public KafkaProducer<String, String> getBatchProducer() { | |
Properties properties = new Properties(); | |
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); | |
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); | |
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); | |
//safe producer | |
properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); | |
properties.setProperty(ProducerConfig.ACKS_CONFIG, "all"); | |
properties.setProperty(ProducerConfig.RETRIES_CONFIG, String.valueOf(Integer.MAX_VALUE)); | |
properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");// kafka 2.x+, use 1 if using ordering | |
// Batching to improve throughput | |
properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); | |
properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "100"); //ms of lag between sending messages (lag of the micro-batch) | |
properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, String.valueOf(32*1024)); // If the size is exceeded the batch will be sent anyway | |
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties); | |
return producer; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment