Skip to content

Instantly share code, notes, and snippets.

@devoncrouse
Created July 25, 2013 16:52
Show Gist options
  • Save devoncrouse/6081648 to your computer and use it in GitHub Desktop.
Save devoncrouse/6081648 to your computer and use it in GitHub Desktop.
Simple class to consume from a Kafka topic
import java.util.List;
import java.nio.ByteBuffer;
import java.io.IOException;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import kafka.message.MessageAndMetadata;
import org.apache.log4j.Logger;
import com.google.common.collect.ImmutableMap;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
public class MessageConsumer implements Runnable {
private static final Logger logger = Logger.getLogger(MessageConsumer.class);
private static final int allowedKafkaErrors = 10;
private int kafkaErrors = 0;
private Properties conf;
/**
*
* @param conf
* @param loader
*/
MessageConsumer(final Properties conf) {
this.conf = conf;
}
@Override
public void run() {
final List<KafkaStream<Message>> streams = connectToQueue(conf);
ExecutorService executor = Executors.newFixedThreadPool(streams.size());
// Execute a consumer thread for each Kafka message stream
// (representing a broker/partition)
for (final KafkaStream<Message> stream : streams) {
executor.submit(new Runnable() {
public void run() {
try {
consumeTopic(stream.iterator());
} catch (Exception e) {
shutdown.setShutdown(true);
logger.fatal("Caught unexpected exception", e);
System.exit(-1);
}
}
});
}
}
/**
* Start consuming from Kafka
*
* @param ConsumerIterator<Message> from Kafka message stream
*/
private void consumeTopic(ConsumerIterator<Message> kafkaIter) throws IOException, InterruptedException {
while (!shutdown.isShutdown()) {
MessageAndMetadata<Message> rawMsg;
try {
if (!kafkaIter.hasNext()) { // Generally doesn't happen since hasNext() blocks
return;
}
rawMsg = kafkaIter.next();
kafkaErrors = 0;
} catch (RuntimeException e) {
logger.error("Unable to pull message from Kafka", e);
kafkaErrors++;
if (kafkaErrors >= allowedKafkaErrors) {
logger
.fatal("Too many consecutive kafka errors detected. Shutting down");
shutdown.setShutdown(true);
}
return;
}
// Here's the consumed message from Kafka
String msg = convertPayloadToString(rawMsg);
try {
// Here's your result object
JSONObject obj = new JSONObject(msg);
} catch (JSONException e) {
logger.error("Invalid JSON object", e);
}
}
}
/**
* Convert the consumed message payload to a String
*
* @param MessageAndMetadata<Message> from Kafka consumer
* @return String representation of the message payload
*/
private String convertPayloadToString(final MessageAndMetadata<Message> msgAndMetadata) {
Message rawMsg = msgAndMetadata.message();
ByteBuffer buf = rawMsg.payload();
byte[] dst = new byte[buf.limit()];
buf.get(dst);
String str = new String(dst);
return str;
}
/**
* Create our Kafka connection.
*
* @param Properties object with Kafka/Zookeeper options
* @return Collection of Kafka message streams
*/
private List<KafkaStream<Message>> connectToQueue(final Properties conf) {
String topic = conf.getProperty("kafka.topic", "MessageHeadersBody");
int streamCount = Integer.parseInt(conf.getProperty("kafka.streams", "3"));
// Create the connection to the cluster
ConsumerConfig consumerConfig = new ConsumerConfig(conf);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
// Create multiple partitions of the stream for multiple consumer threads
Map<String, List<KafkaStream<Message>>> topicMessageStreams =
consumerConnector.createMessageStreams(ImmutableMap.of(topic, streamCount));
List<KafkaStream<Message>> streams = topicMessageStreams.get(topic);
return streams;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment