Skip to content

Instantly share code, notes, and snippets.

Created May 16, 2016 15:43
Show Gist options
  • Save am-MongoDB/77eb68a6af50f93453a7c316dbe1c6b5 to your computer and use it in GitHub Desktop.
Save am-MongoDB/77eb68a6af50f93453a7c316dbe1c6b5 to your computer and use it in GitHub Desktop.
Example MongoDB consumer for Apache Kafka
package com.clusterdb.kafka;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.*;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;
import com.mongodb.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import org.bson.Document;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.lang.reflect.Type;
public class MongoDBSimpleConsumer {
public static void main(String args[]) {
MongoDBSimpleConsumer example = new MongoDBSimpleConsumer();
//long maxReads = Long.parseLong(args[0]);
long maxReads = 100;
//String topic = args[1];
String topic = "clusterdb-topic1";
//int partition = Integer.parseInt(args[2]);
int partition = 0;
List<String> seeds = new ArrayList<String>();
//int port = Integer.parseInt(args[4]);
int port = 9092;
try {, topic, partition, seeds, port);
} catch (Exception e) {
System.out.println("Oops:" + e);
private List<String> m_replicaBrokers = new ArrayList<String>();
public MongoDBSimpleConsumer() {
m_replicaBrokers = new ArrayList<String>();
public void run(long a_maxReads, String a_topic, int a_partition,
List<String> a_seedBrokers, int a_port) throws Exception {
// find the meta data about the topic and partition we are interested in
PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);
if (metadata == null) {
System.out.println("Can't find metadata for Topic and Partition. Exiting");
if (metadata.leader() == null) {
System.out.println("Can't find Leader for Topic and Partition. Exiting");
String leadBroker = metadata.leader().host();
String clientName = "Client_" + a_topic + "_" + a_partition;
SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port,
100000, 64 * 1024, clientName);
long readOffset = getLastOffset(consumer,a_topic, a_partition,
kafka.api.OffsetRequest.EarliestTime(), clientName);
int numErrors = 0;
while (a_maxReads > 0) {
if (consumer == null) {
consumer = new SimpleConsumer(leadBroker, a_port,
100000, 64 * 1024, clientName);
FetchRequest req = new FetchRequestBuilder()
.addFetch(a_topic, a_partition, readOffset, 100000)
FetchResponse fetchResponse = consumer.fetch(req);
if (fetchResponse.hasError()) {
// Something went wrong!
short code = fetchResponse.errorCode(a_topic, a_partition);
System.out.println("Error fetching data from the Broker:"
+ leadBroker + " Reason: " + code);
if (numErrors > 5) break;
if (code == ErrorMapping.OffsetOutOfRangeCode()) {
// We asked for an invalid offset. For simple case ask
// for the last element to reset
readOffset = getLastOffset(consumer,a_topic, a_partition,
kafka.api.OffsetRequest.LatestTime(), clientName);
consumer = null;
leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);
numErrors = 0;
long numRead = 0;
MongoClient client = new MongoClient();
MongoDatabase db = client.getDatabase("clusterdb");
MongoCollection<Document> fishCollection = db.getCollection("fish");
Gson gson = new Gson();
Type type = new TypeToken<Fish>() {}.getType();
for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic,
a_partition)) {
long currentOffset = messageAndOffset.offset();
if (currentOffset < readOffset) {
System.out.println("Found an old offset: " + currentOffset
+ " Expecting: " + readOffset);
readOffset = messageAndOffset.nextOffset();
ByteBuffer payload = messageAndOffset.message().payload();
byte[] bytes = new byte[payload.limit()];
Fish incomingFish = gson.fromJson(new String(bytes, "UTF-8"), type);
if (numRead == 0) {
try {
} catch (InterruptedException ie) {
if (consumer != null) consumer.close();
public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
long whichTime, String clientName) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo =
new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError()) {
System.out.println("Error fetching data Offset Data the Broker. Reason: "
+ response.errorCode(topic, partition) );
return 0;
long[] offsets = response.offsets(topic, partition);
return offsets[0];
private String findNewLeader(String a_oldLeader, String a_topic, int a_partition,
int a_port) throws Exception {
for (int i = 0; i < 3; i++) {
boolean goToSleep = false;
PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic,
if (metadata == null) {
goToSleep = true;
} else if (metadata.leader() == null) {
goToSleep = true;
} else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
// first time through if the leader hasn't changed give ZooKeeper
// a second to recover second time, assume the broker did recover before failover,
// or it was a non-Broker issue
goToSleep = true;
} else {
return metadata.leader().host();
if (goToSleep) {
try {
} catch (InterruptedException ie) {
System.out.println("Unable to find new leader after Broker failure. Exiting");
throw new Exception("Unable to find new leader after Broker failure. Exiting");
private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port,
String a_topic, int a_partition) {
PartitionMetadata returnMetaData = null;
for (String seed : a_seedBrokers) {
SimpleConsumer consumer = null;
try {
consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");
List<String> topics = Collections.singletonList(a_topic);
TopicMetadataRequest req = new TopicMetadataRequest(topics);
kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
List<TopicMetadata> metaData = resp.topicsMetadata();
for (TopicMetadata item : metaData) {
for (PartitionMetadata part : item.partitionsMetadata()) {
if (part.partitionId() == a_partition) {
returnMetaData = part;
break loop;
} catch (Exception e) {
System.out.println("Error communicating with Broker [" + seed + "] to find Leader for ["
+ a_topic
+ ", " + a_partition + "] Reason: " + e);
} finally {
if (consumer != null) consumer.close();
if (returnMetaData != null) {
for (kafka.cluster.Broker replica : returnMetaData.replicas()) {
return returnMetaData;
Copy link

Viyaan commented Aug 26, 2017

Hello can you please share the code of Fish Class.

Copy link

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment