Skip to content

Instantly share code, notes, and snippets.

@fjavieralba
Last active March 23, 2021 09:57
Show Gist options
  • Save fjavieralba/7930018 to your computer and use it in GitHub Desktop.
Save fjavieralba/7930018 to your computer and use it in GitHub Desktop.
Embedding Kafka+Zookeeper for testing purposes. Tested with Apache Kafka 0.8
import java.io.IOException;
import java.util.Properties;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
public class KafkaLocal {
public KafkaServerStartable kafka;
public ZooKeeperLocal zookeeper;
public KafkaLocal(Properties kafkaProperties, Properties zkProperties) throws IOException, InterruptedException{
KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
//start local zookeeper
System.out.println("starting local zookeeper...");
zookeeper = new ZooKeeperLocal(zkProperties);
System.out.println("done");
//start local kafka broker
kafka = new KafkaServerStartable(kafkaConfig);
System.out.println("starting local kafka broker...");
kafka.startup();
System.out.println("done");
}
public void stop(){
//stop kafka broker
System.out.println("stopping kafka...");
kafka.shutdown();
System.out.println("done");
}
}
public class MyTest {
static KafkaLocal kafka;
@BeforeClass
public static void startKafka(){
Properties kafkaProperties = new Properties();
Properties zkProperties = new Properties();
try {
//load properties
kafkaProperties.load(Class.class.getResourceAsStream("/kafkalocal.properties"));
zkProperties.load(Class.class.getResourceAsStream("/zklocal.properties"));
//start kafka
kafka = new KafkaLocal(kafkaProperties, zkProperties);
Thread.sleep(5000);
} catch (Exception e){
e.printStackTrace(System.out);
fail("Error running local Kafka broker");
e.printStackTrace(System.out);
}
//do other things
}
@Test
public void testSomething() {
}
}
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Properties;
import org.apache.zookeeper.server.ServerConfig;
import org.apache.zookeeper.server.ZooKeeperServerMain;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
public class ZooKeeperLocal {
ZooKeeperServerMain zooKeeperServer;
public ZooKeeperLocal(Properties zkProperties) throws FileNotFoundException, IOException{
QuorumPeerConfig quorumConfiguration = new QuorumPeerConfig();
try {
quorumConfiguration.parseProperties(zkProperties);
} catch(Exception e) {
throw new RuntimeException(e);
}
zooKeeperServer = new ZooKeeperServerMain();
final ServerConfig configuration = new ServerConfig();
configuration.readFrom(quorumConfiguration);
new Thread() {
public void run() {
try {
zooKeeperServer.runFromConfig(configuration);
} catch (IOException e) {
System.out.println("ZooKeeper Failed");
e.printStackTrace(System.err);
}
}
}.start();
}
}
@PrashantSabnekar
Copy link

If anybody tried to run kafka localy (embedded mode), please share the properties file.

@jamesrgrinter
Copy link

If you want to see a complete implementation of this, apache-flume uses it in some of their tests.

e..g flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaLocal.java and TestUtil wrapping it.

@sahlone
Copy link

sahlone commented Aug 15, 2018

How can it use the Kafka scala library in java code ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment