Created
August 6, 2021 17:59
-
-
Save ShubhamRwt/d2d3a0ca51d4a2f578cf258522cf9d3e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public <K, V> void produce(String producerName, int messageCount, Serializer<K> keySerializer, Serializer<V> valueSerializer, Runnable completionCallback, Supplier<ProducerRecord<K, V>> messageSupplier) { | |
Properties props = new Properties(); | |
props.setProperty("bootstrap.servers", kafkaCluster.bootstrapServers()); | |
props.setProperty("acks", Integer.toString(1)); | |
Thread t = new Thread(() -> { | |
LOGGER.info("Starting producer {} to write {} messages", producerName, messageCount); | |
try { | |
KafkaProducer<K, V> producer = new KafkaProducer<>(props, keySerializer, valueSerializer); | |
Throwable cause = null; | |
try { | |
for (int i = 0; i != messageCount; ++i) { | |
ProducerRecord<K, V> record = messageSupplier.get(); | |
producer.send(record); | |
producer.flush(); | |
LOGGER.info("Producer {}: sent message {}", producerName, record); | |
} | |
} catch (Throwable e) { | |
cause = e; | |
throw e; | |
} finally { | |
if (cause != null) { | |
try { | |
producer.close(); | |
} catch (Throwable c) { | |
cause.addSuppressed(c); | |
} | |
} else { | |
producer.close(); | |
} | |
} | |
} finally { | |
if (completionCallback != null) { | |
completionCallback.run(); | |
} | |
LOGGER.debug("Stopping producer {}", producerName); | |
} | |
}); | |
t.setName(producerName + "-thread"); | |
t.start(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment