Skip to content

Instantly share code, notes, and snippets.

@hanrw
Created December 4, 2015 06:16
Show Gist options
  • Save hanrw/f3354623bdee5f061a92 to your computer and use it in GitHub Desktop.
Save hanrw/f3354623bdee5f061a92 to your computer and use it in GitHub Desktop.
version := "0.1"
scalaVersion := "2.11.7"
val versionOfAkka = "2.3.13"
val versionOfKafka = "0.8.2.0"
libraryDependencies ++= Seq(
"org.apache.kafka" % "kafka-clients" % versionOfKafka exclude("org.slf4j", "slf4j-log4j12") withSources(),
"org.apache.kafka" %% "kafka" % versionOfKafka exclude("org.slf4j", "slf4j-log4j12") withSources(),
"com.typesafe.akka" %% "akka-actor" % versionOfAkka,
"com.typesafe.akka" %% "akka-cluster" % versionOfAkka,
"com.typesafe.akka" %% "akka-kernel" % versionOfAkka,
"com.typesafe.akka" %% "akka-slf4j" % versionOfAkka,
"com.typesafe.akka" %% "akka-contrib" % versionOfAkka,
"com.typesafe.akka" %% "akka-testkit" % versionOfAkka,
"org.slf4j" % "jul-to-slf4j" % "1.7.7",
"org.apache.curator" % "curator-test" % "2.8.0",
"org.apache.kafka" %% "kafka" % "0.8.2.0" classifier "test"
)
==============================================
import kafka.api.FetchRequestBuilder;
import kafka.javaapi.FetchRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.server.KafkaServerStartable;
import kafka.utils.*;
import kafka.zk.EmbeddedZookeeper;
import org.apache.curator.test.TestingServer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.I0Itec.zkclient.ZkClient;
import kafka.admin.TopicCommand;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.producer.KeyedMessage;
import kafka.producer.Producer;
import kafka.producer.ProducerConfig;
public class KafkaLocalBroker {
private int brokerId = 0;
private String topic = "test";
public void setUp() throws Exception {
// setup Zookeeper
String zkConnect = TestZKUtils.zookeeperConnect();
EmbeddedZookeeper zkServer = new EmbeddedZookeeper(zkConnect);
ZkClient zkClient = new ZkClient(zkServer.connectString(), 30000, 30000, ZKStringSerializer$.MODULE$);
// setup Broker
int port = TestUtils.choosePort();
Properties props = TestUtils.createBrokerConfig(brokerId, port, true);
KafkaConfig config = new KafkaConfig(props);
Time mock = new MockTime();
KafkaServer kafkaServer = TestUtils.createServer(config, mock);
String [] arguments = new String[]{"--topic", topic, "--partitions", "1","--replication-factor", "1"};
// create topic
TopicCommand.createTopic(zkClient, new TopicCommand.TopicCommandOptions(arguments));
List<KafkaServer> servers = new ArrayList<KafkaServer>();
servers.add(kafkaServer);
TestUtils.waitUntilMetadataIsPropagated(scala.collection.JavaConversions.asScalaBuffer(servers), topic, 0, 5000);
// setup producer
Properties properties = TestUtils.getProducerConfig("localhost:" + port);
ProducerConfig producerConfig = new ProducerConfig(properties);
Producer producer = new Producer(producerConfig);
// setup simple consumer
Properties consumerProperties = TestUtils.createConsumerProperties(zkServer.connectString(), "group0", "consumer0", -1);
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProperties));
SimpleConsumer consumer2 = new SimpleConsumer("localhost", port, 100, 64*1024,"1");
// send message
KeyedMessage<Integer, byte[]> data = new KeyedMessage(topic, "test-message".getBytes(StandardCharsets.UTF_8));
List<KeyedMessage> messages = new ArrayList<KeyedMessage>();
messages.add(data);
messages.add(new KeyedMessage(topic, "test-message1".getBytes(StandardCharsets.UTF_8)));
producer.send(scala.collection.JavaConversions.asScalaBuffer(messages));
producer.close();
// deleting zookeeper information to make sure the consumer starts from the beginning
// see https://stackoverflow.com/questions/14935755/how-to-get-data-from-old-offset-point-in-kafka
zkClient.delete("/consumers/group0");
// starting consumer
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
if(iterator.hasNext()) {
String msg = new String(iterator.next().message(), StandardCharsets.UTF_8);
System.out.println(msg);
} else {
}
Thread.sleep(2500);
// cleanup
consumer.shutdown();
kafkaServer.shutdown();
zkClient.close();
zkServer.shutdown();
}
public static void main(String[] args) throws Exception {
KafkaLocalBroker kafkaLocalBroker = new KafkaLocalBroker();
kafkaLocalBroker.setUp();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment