Skip to content

Instantly share code, notes, and snippets.

@jesusjavierdediego
Created July 13, 2020 17:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jesusjavierdediego/22a24f016814d7bd6bbfb89936297cf6 to your computer and use it in GitHub Desktop.
Save jesusjavierdediego/22a24f016814d7bd6bbfb89936297cf6 to your computer and use it in GitHub Desktop.
public KafkaProducer<String, String> getBatchProducer() {
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//safe producer
properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
properties.setProperty(ProducerConfig.RETRIES_CONFIG, String.valueOf(Integer.MAX_VALUE));
properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");// kafka 2.x+, use 1 if using ordering
// Batching to improve throughput
properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "100"); //ms of lag between sending messages (lag of the micro-batch)
properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, String.valueOf(32*1024)); // If the size is exceeded the batch will be sent anyway
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
return producer;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment