Skip to content

Instantly share code, notes, and snippets.

@idrabenia
Created September 14, 2016 14:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save idrabenia/29fba930170de438ee8842c5182f6f91 to your computer and use it in GitHub Desktop.
Save idrabenia/29fba930170de438ee8842c5182f6f91 to your computer and use it in GitHub Desktop.
public void run() {
logger.log(Level.INFO, ProducerRunnable.class.toString() + " is starting.");
while (!closing) {
String fieldName = "records";
// Push a message into the list to be sent.
MessageList list = new MessageList();
list.push("This is a test message" + producedMessages);
try {
// Create a producer record which will be sent
// to the Message Hub service, providing the topic
// name, field name and message. The field name and
// message are converted to UTF-8.
ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(
topic,
fieldName.getBytes("UTF-8"),
list.toString().getBytes("UTF-8"));
// Synchronously wait for a response from Message Hub / Kafka.
RecordMetadata m = kafkaProducer.send(record).get();
producedMessages++;
logger.log(Level.INFO, "Message produced, offset: " + m.offset());
Thread.sleep(1000);
} catch (final Exception e) {
e.printStackTrace();
shutdown();
// Consumer will hang forever, so exit program.
System.exit(-1);
}
}
logger.log(Level.INFO, ProducerRunnable.class.toString() + " is shutting down.");
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment