Skip to content

Instantly share code, notes, and snippets.

@ShubhamRwt
Created August 6, 2021 17:59
Show Gist options
  • Save ShubhamRwt/d2d3a0ca51d4a2f578cf258522cf9d3e to your computer and use it in GitHub Desktop.
Save ShubhamRwt/d2d3a0ca51d4a2f578cf258522cf9d3e to your computer and use it in GitHub Desktop.
public <K, V> void produce(String producerName, int messageCount, Serializer<K> keySerializer, Serializer<V> valueSerializer, Runnable completionCallback, Supplier<ProducerRecord<K, V>> messageSupplier) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", kafkaCluster.bootstrapServers());
props.setProperty("acks", Integer.toString(1));
Thread t = new Thread(() -> {
LOGGER.info("Starting producer {} to write {} messages", producerName, messageCount);
try {
KafkaProducer<K, V> producer = new KafkaProducer<>(props, keySerializer, valueSerializer);
Throwable cause = null;
try {
for (int i = 0; i != messageCount; ++i) {
ProducerRecord<K, V> record = messageSupplier.get();
producer.send(record);
producer.flush();
LOGGER.info("Producer {}: sent message {}", producerName, record);
}
} catch (Throwable e) {
cause = e;
throw e;
} finally {
if (cause != null) {
try {
producer.close();
} catch (Throwable c) {
cause.addSuppressed(c);
}
} else {
producer.close();
}
}
} finally {
if (completionCallback != null) {
completionCallback.run();
}
LOGGER.debug("Stopping producer {}", producerName);
}
});
t.setName(producerName + "-thread");
t.start();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment