Skip to content

Instantly share code, notes, and snippets.

@malcolmgreaves
Forked from asmaier/KafkaProducerIT.java
Last active August 29, 2015 14:21
Show Gist options
  • Save malcolmgreaves/90418113e76d9074be7a to your computer and use it in GitHub Desktop.
Save malcolmgreaves/90418113e76d9074be7a to your computer and use it in GitHub Desktop.
import kafka.admin.CreateTopicCommand;
import kafka.producer.KeyedMessage;
import kafka.producer.Producer;
import kafka.producer.ProducerConfig;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.*;
import kafka.utils.TestUtils;
import kafka.utils.TestZKUtils;
import kafka.zk.EmbeddedZookeeper;
import org.I0Itec.zkclient.ZkClient;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class KafkaProducerTest {
private int brokerId = 0;
private String topic = "test";
@Test
public void producerTest() throws InterruptedException {
// setup Zookeeper
String zkConnect = TestZKUtils.zookeeperConnect();
EmbeddedZookeeper zkServer = new EmbeddedZookeeper(zkConnect);
ZkClient zkClient = new ZkClient(zkServer.connectString(), 30000, 30000, ZKStringSerializer$.MODULE$);
// setup Broker
int port = TestUtils.choosePort();
Properties props = TestUtils.createBrokerConfig(brokerId, port);
KafkaConfig config = new KafkaConfig(props);
Time mock = new MockTime();
KafkaServer kafkaServer = TestUtils.createServer(config, mock);
// create topic
CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "");
List<KafkaServer> servers = new ArrayList<KafkaServer>();
servers.add(kafkaServer);
TestUtils.waitUntilMetadataIsPropagated(scala.collection.JavaConversions.asBuffer(servers), topic, 0, 5000);
// setup producer
Properties properties = TestUtils.getProducerConfig("localhost:" + port, "kafka.producer.DefaultPartitioner");
ProducerConfig pConfig = new ProducerConfig(properties);
Producer producer = new Producer(pConfig);
// send message
KeyedMessage<Integer, String> data = new KeyedMessage(topic, "test-message");
List<KeyedMessage> messages = new ArrayList<KeyedMessage>();
messages.add(data);
producer.send(scala.collection.JavaConversions.asBuffer(messages));
// cleanup
producer.close();
kafkaServer.shutdown();
zkClient.close();
zkServer.shutdown();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment