Skip to content

Instantly share code, notes, and snippets.

@pavelfomin
Last active January 11, 2024 21:03
Show Gist options
  • Save pavelfomin/eb330de0c2583bc36c8d5c21ff6c8af9 to your computer and use it in GitHub Desktop.
Save pavelfomin/eb330de0c2583bc36c8d5c21ff6c8af9 to your computer and use it in GitHub Desktop.
Spring Kafka Producer using KafkaTemplate, CompletableFuture and KafkaTemplate.flush()

See https://docs.spring.io/spring-kafka/docs/1.0.6.RELEASE/reference/html/_reference.html

If you wish to block the sending thread, to await the result, you can invoke the future’s get() method. You may wish to invoke flush() before waiting or, for convenience, the template has a constructor with an autoFlush parameter which will cause the template to flush() on each send. Note, however that flushing will likely significantly reduce performance.

Inspired by https://github.com/spring-projects/spring-batch/blob/main/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/kafka/KafkaItemWriter.java#L65

@Getter
private final KafkaTemplate<String, T> kafkaTemplate;
public CompletableFuture<SendResult<String, T>> sendMessage(String key, T message, Integer partition) {
try {
return getKafkaTemplate().send(getTopicName(), partition, key, message);
} catch (Exception e) {
throw new RuntimeException("Failed to send a message to topic: " + getTopicName(), e);
}
}
protected void flush(List<CompletableFuture<SendResult<String, T>>> futures) throws Exception {
getKafkaTemplate().flush();
for (var future : futures) {
getSendResult(future);
}
futures.clear();
}
protected SendResult<String, T> getSendResult(Future<SendResult<String, T>> future)
throws ExecutionException, InterruptedException, TimeoutException {
return future.get(timeout, TimeUnit.MILLISECONDS);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment