Skip to content

Instantly share code, notes, and snippets.

@vamdt
Last active August 22, 2018 05:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save vamdt/90ded95c3ff7035a6771912f5b0b6ab7 to your computer and use it in GitHub Desktop.
Save vamdt/90ded95c3ff7035a6771912f5b0b6ab7 to your computer and use it in GitHub Desktop.
Dynamic create kafka topic
package com.hello.world;
import joptsimple.internal.Strings;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import java.util.List;
import java.util.Properties;
public class KafkaUtils {
public static void createTopic(String topic,int partitions,List<String> zkList) {
String zkServers = Strings.join(zkList, ",");
int sessionTimeoutMs = 10 * 1000;
int connectionTimeoutMs = 8 * 1000;
ZkClient zkClient = null;
ZkUtils zkUtils = null;
try{
zkClient = new ZkClient(
zkServers,
sessionTimeoutMs,
connectionTimeoutMs,
ZKStringSerializer$.MODULE$);
zkUtils = new ZkUtils(zkClient, new ZkConnection(zkServers), false);
int replication = 2;
Properties topicConfig = new Properties();
AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicConfig, RackAwareMode.Enforced$.MODULE$);
}catch(Exception e){
e.printStackTrace();
}finally{
if(zkClient!=null){
zkClient.close();
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment