Skip to content

Instantly share code, notes, and snippets.

@benstopford
Last active October 20, 2021 19:47
Show Gist options
  • Star 15 You must be signed in to star a gist
  • Fork 4 You must be signed in to fork a gist
  • Save benstopford/49555b2962f93f6d50e3 to your computer and use it in GitHub Desktop.
Save benstopford/49555b2962f93f6d50e3 to your computer and use it in GitHub Desktop.
Kafka Testing at its Most Simple
package com.confluent.benstopford;
import kafka.consumer.*;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import org.apache.curator.test.TestingServer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
/**
* Kafka testing at its most simple.
* You'll need the following in your pom:
*
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<version>2.8.0</version>
<scope>test</scope>
</dependency>
</dependencies>
*/
public class KafkaMostBasicTest {
public static final String topic = "topic1-" + System.currentTimeMillis();
private KafkaTestFixture server;
private Producer producer;
private ConsumerConnector consumerConnector;
@Before
public void setup() throws Exception {
server = new KafkaTestFixture();
server.start(serverProperties());
}
@After
public void teardown() throws Exception {
producer.close();
consumerConnector.shutdown();
server.stop();
}
@Test
public void shouldWriteThenRead() throws Exception {
//Create a consumer
ConsumerIterator<String, String> it = buildConsumer(KafkaMostBasicTest.topic);
//Create a producer
producer = new KafkaProducer(producerProps());
//send a message
producer.send(new ProducerRecord(KafkaMostBasicTest.topic, "message")).get();
//read it back
MessageAndMetadata<String, String> messageAndMetadata = it.next();
String value = messageAndMetadata.message();
assertThat(value, is("message"));
}
private ConsumerIterator<String, String> buildConsumer(String topic) {
Properties props = consumerProperties();
Map<String, Integer> topicCountMap = new HashMap();
topicCountMap.put(topic, 1);
ConsumerConfig consumerConfig = new ConsumerConfig(props);
consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, List<KafkaStream<String, String>>> consumers = consumerConnector.createMessageStreams(topicCountMap, new StringDecoder(null), new StringDecoder(null));
KafkaStream<String, String> stream = consumers.get(topic).get(0);
return stream.iterator();
}
private Properties consumerProperties() {
Properties props = new Properties();
props.put("zookeeper.connect", serverProperties().get("zookeeper.connect"));
props.put("group.id", "group1");
props.put("auto.offset.reset", "smallest");
return props;
}
private Properties producerProps() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("request.required.acks", "1");
return props;
}
private Properties serverProperties() {
Properties props = new Properties();
props.put("zookeeper.connect", "localhost:2181");
props.put("broker.id", "1");
return props;
}
private static class KafkaTestFixture {
private TestingServer zk;
private KafkaServerStartable kafka;
public void start(Properties properties) throws Exception {
Integer port = getZkPort(properties);
zk = new TestingServer(port);
zk.start();
KafkaConfig kafkaConfig = new KafkaConfig(properties);
kafka = new KafkaServerStartable(kafkaConfig);
kafka.startup();
}
public void stop() throws IOException {
kafka.shutdown();
zk.stop();
zk.close();
}
private int getZkPort(Properties properties) {
String url = (String) properties.get("zookeeper.connect");
String port = url.split(":")[1];
return Integer.valueOf(port);
}
}
}
@spasaro
Copy link

spasaro commented Jan 31, 2019

Thanks for posting this! It helped me a lot!

@MaryCooperGD
Copy link

Hi, when I try to run the test I have these errors:

image

First one is

KafkaConfig kafkaConfig = new KafkaConfig(properties);

while the other is for

producer.close();

Do you know what the error could be?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment