Skip to content

Instantly share code, notes, and snippets.

@johnllao
Created September 17, 2015 23:53
Show Gist options
  • Save johnllao/500005993e0de750fac5 to your computer and use it in GitHub Desktop.
Save johnllao/500005993e0de750fac5 to your computer and use it in GitHub Desktop.
package org.hello.kafka;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
public class HelloKafka {
private static final String CLIENT_NAME = "hello.kafka";
private static final String TOPIC_NAME = "hello-string";
public static void main(String[] args) {
System.out.println("Hello Kafka (version 1.0.0)");
System.out.println("(c) hello.org 2015");
System.out.println();
send();
receive();
System.out.println("Bye!");
}
private static void send() {
try {
final Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", org.apache.kafka.common.serialization.StringSerializer.class.getName());
props.put("value.serializer", org.apache.kafka.common.serialization.StringSerializer.class.getName());
final String message = "TEST" + UUID.randomUUID().toString().replace("-", "").toUpperCase();
final KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
final ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC_NAME, message, message);
producer.send(record).get();
System.out.println("Sent!");
producer.close();
}
catch (Exception e) {
System.out.println(e);
}
}
private static void receive() {
String groupId = UUID.randomUUID().toString().replace("-", "").toUpperCase();
final Properties props = new Properties();
props.put("zookeeper.connect", "localhost:2181");
props.put("group.id", CLIENT_NAME);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
final ConsumerConfig consumerConfig = new ConsumerConfig(props);
final ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
final Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(TOPIC_NAME, 1);
final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
final KafkaStream<byte[], byte[]> stream = consumerMap.get(TOPIC_NAME).get(0);
for (MessageAndMetadata<byte[], byte[]> messageAndMetadata : stream) {
System.out.println("Received message: " + new String(messageAndMetadata.message()));
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment