Created
December 4, 2015 06:16
-
-
Save hanrw/f3354623bdee5f061a92 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
version := "0.1" | |
scalaVersion := "2.11.7" | |
val versionOfAkka = "2.3.13" | |
val versionOfKafka = "0.8.2.0" | |
libraryDependencies ++= Seq( | |
"org.apache.kafka" % "kafka-clients" % versionOfKafka exclude("org.slf4j", "slf4j-log4j12") withSources(), | |
"org.apache.kafka" %% "kafka" % versionOfKafka exclude("org.slf4j", "slf4j-log4j12") withSources(), | |
"com.typesafe.akka" %% "akka-actor" % versionOfAkka, | |
"com.typesafe.akka" %% "akka-cluster" % versionOfAkka, | |
"com.typesafe.akka" %% "akka-kernel" % versionOfAkka, | |
"com.typesafe.akka" %% "akka-slf4j" % versionOfAkka, | |
"com.typesafe.akka" %% "akka-contrib" % versionOfAkka, | |
"com.typesafe.akka" %% "akka-testkit" % versionOfAkka, | |
"org.slf4j" % "jul-to-slf4j" % "1.7.7", | |
"org.apache.curator" % "curator-test" % "2.8.0", | |
"org.apache.kafka" %% "kafka" % "0.8.2.0" classifier "test" | |
) | |
============================================== | |
import kafka.api.FetchRequestBuilder; | |
import kafka.javaapi.FetchRequest; | |
import kafka.javaapi.consumer.SimpleConsumer; | |
import kafka.server.KafkaConfig; | |
import kafka.server.KafkaServer; | |
import kafka.server.KafkaServerStartable; | |
import kafka.utils.*; | |
import kafka.zk.EmbeddedZookeeper; | |
import org.apache.curator.test.TestingServer; | |
import java.nio.charset.StandardCharsets; | |
import java.util.ArrayList; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Properties; | |
import org.I0Itec.zkclient.ZkClient; | |
import kafka.admin.TopicCommand; | |
import kafka.consumer.ConsumerConfig; | |
import kafka.consumer.ConsumerIterator; | |
import kafka.consumer.KafkaStream; | |
import kafka.javaapi.consumer.ConsumerConnector; | |
import kafka.producer.KeyedMessage; | |
import kafka.producer.Producer; | |
import kafka.producer.ProducerConfig; | |
public class KafkaLocalBroker { | |
private int brokerId = 0; | |
private String topic = "test"; | |
public void setUp() throws Exception { | |
// setup Zookeeper | |
String zkConnect = TestZKUtils.zookeeperConnect(); | |
EmbeddedZookeeper zkServer = new EmbeddedZookeeper(zkConnect); | |
ZkClient zkClient = new ZkClient(zkServer.connectString(), 30000, 30000, ZKStringSerializer$.MODULE$); | |
// setup Broker | |
int port = TestUtils.choosePort(); | |
Properties props = TestUtils.createBrokerConfig(brokerId, port, true); | |
KafkaConfig config = new KafkaConfig(props); | |
Time mock = new MockTime(); | |
KafkaServer kafkaServer = TestUtils.createServer(config, mock); | |
String [] arguments = new String[]{"--topic", topic, "--partitions", "1","--replication-factor", "1"}; | |
// create topic | |
TopicCommand.createTopic(zkClient, new TopicCommand.TopicCommandOptions(arguments)); | |
List<KafkaServer> servers = new ArrayList<KafkaServer>(); | |
servers.add(kafkaServer); | |
TestUtils.waitUntilMetadataIsPropagated(scala.collection.JavaConversions.asScalaBuffer(servers), topic, 0, 5000); | |
// setup producer | |
Properties properties = TestUtils.getProducerConfig("localhost:" + port); | |
ProducerConfig producerConfig = new ProducerConfig(properties); | |
Producer producer = new Producer(producerConfig); | |
// setup simple consumer | |
Properties consumerProperties = TestUtils.createConsumerProperties(zkServer.connectString(), "group0", "consumer0", -1); | |
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProperties)); | |
SimpleConsumer consumer2 = new SimpleConsumer("localhost", port, 100, 64*1024,"1"); | |
// send message | |
KeyedMessage<Integer, byte[]> data = new KeyedMessage(topic, "test-message".getBytes(StandardCharsets.UTF_8)); | |
List<KeyedMessage> messages = new ArrayList<KeyedMessage>(); | |
messages.add(data); | |
messages.add(new KeyedMessage(topic, "test-message1".getBytes(StandardCharsets.UTF_8))); | |
producer.send(scala.collection.JavaConversions.asScalaBuffer(messages)); | |
producer.close(); | |
// deleting zookeeper information to make sure the consumer starts from the beginning | |
// see https://stackoverflow.com/questions/14935755/how-to-get-data-from-old-offset-point-in-kafka | |
zkClient.delete("/consumers/group0"); | |
// starting consumer | |
Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); | |
topicCountMap.put(topic, 1); | |
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); | |
KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0); | |
ConsumerIterator<byte[], byte[]> iterator = stream.iterator(); | |
if(iterator.hasNext()) { | |
String msg = new String(iterator.next().message(), StandardCharsets.UTF_8); | |
System.out.println(msg); | |
} else { | |
} | |
Thread.sleep(2500); | |
// cleanup | |
consumer.shutdown(); | |
kafkaServer.shutdown(); | |
zkClient.close(); | |
zkServer.shutdown(); | |
} | |
public static void main(String[] args) throws Exception { | |
KafkaLocalBroker kafkaLocalBroker = new KafkaLocalBroker(); | |
kafkaLocalBroker.setUp(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment