Skip to content

Instantly share code, notes, and snippets.

@terrancesnyder
Last active October 15, 2015 11:03
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save terrancesnyder/8886157 to your computer and use it in GitHub Desktop.
Save terrancesnyder/8886157 to your computer and use it in GitHub Desktop.
Example of processing Kafka messages using JQuery like deferred / promise for cleaner async code.
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.util.ByteBufferInputStream;
import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger;
import org.jdeferred.Promise;
import org.jdeferred.impl.DeferredObject;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.javaapi.producer.Producer;
import kafka.javaapi.producer.ProducerData;
import kafka.message.Message;
import kafka.message.MessageAndMetadata;
import kafka.producer.ProducerConfig;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
public class KafkaConsumerExample {
private static final Logger log = Logger.getLogger(KafkaConsumerExample.class);
private final SpecificDatumReader<Event> avroEventReader = new SpecificDatumReader<Event>(Event.SCHEMA$);
private final DecoderFactory avroDecoderFactory = DecoderFactory.get();
private final ConsumerConnector kafkaConsumer;
public KafkaConsumerExample(String zookeeper, String groupid) {
Properties props = new Properties();
props.put("zk.connect", zookeeper);
props.put("zk.connectiontimeout.ms", "1000");
props.put("groupid", consumerId);
kafkaConsumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
}
public Promise<String, String, Event> listen(String topic) {
final DeferredObject<String, String, Event> dfd = new DeferredObject<String, String, Event>();
int threads = 4;
Map<String, List<KafkaStream<Message>>> topicMessageStreams = consumerConnector.createMessageStreams(ImmutableMap.of(topic, threads));
List<KafkaStream<Message>> streams = topicMessageStreams.get(topic);
ExecutorService executor = Executors.newFixedThreadPool(threads);
for (final KafkaStream<Message> stream : streams) {
executor.submit(new Runnable() {
public void run() {
InputStream kafkaMessageInputStream = null;
BinaryDecoder avroBinaryDecoder = null;
Event avroEvent = null;
for (MessageAndMetadata<Message> packet : stream) {
try {
kafkaMessageInputStream = new ByteBufferInputStream(Lists.newArrayList(packet.message().payload()));
avroBinaryDecoder = decoderFactory.binaryDecoder(kafkaMessageInputStream, avroBinaryDecoder);
avroEvent = reader.read(avroEvent, avroBinaryDecoder);
dfd.notify(avroEvent);
} catch (Exception ex) {
log.error("Unable to process event from kafka, see inner exception details", ex);
dfd.reject(ex.getMessage());
} finally {
IOUtils.closeQuietly(is);
}
}
dfd.resolve("done");
}
});
}
return dfd.promise();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment