Skip to content
Create a gist now

Instantly share code, notes, and snippets.

standalone consumer
package com.trendrr.shared.kafka;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import kafka.message.MessageAndMetadata;
/**
* standalone consumer that counts duplicate messages
* @author mark grabois
*
*/
public class StandaloneConsumer {
private static String zkConnect = "server1:2181 , server2:2181 , server3:2181";
private static Set<Long> uniques = new HashSet<Long>();
public static void main(String[] args){
Properties props = new Properties();
props.put("zk.connect", zkConnect);
props.put("autooffset.reset", "largest"); //default to most recent item in the queue.
props.put("groupid", "test_group_1");
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
for (MessageAndMetadata<Message> message : StandaloneConsumer.createMessageStream(consumer, "test_topic_1")){
String messageString = StandaloneConsumer.getMessage(message);
Long num = Long.parseLong(messageString);
if(uniques.contains(num)){
System.out.println("duplicate : "+messageString);
}else{
System.out.println("unique : "+messageString);
}
uniques.add(num);
if (uniques.size() > 250000) {
uniques.clear();
}
}
}
private static KafkaStream<Message> createMessageStream(ConsumerConnector consumer, String topicId){
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topicId, 1);
return consumer.createMessageStreams(topicCountMap).get(topicId).get(0);
}
private static String getMessage(MessageAndMetadata<Message> message){
ByteBuffer buffer = message.message().payload();
byte [] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
return new String(bytes);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.