import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.Properties;
/**
* KafkaBulkTopicCreate - to read topic configs from the file and do bulk create
* the source file expected to have input written in following format
* topicName:partitionCount:replicationFactor
* e.g.
* testtopic:3:3
* testtopic1:3:3
*
*/
public class KafkaBulkTopicCreate
{
public static void main( String[] args ) {
if(args.length<2){
System.out.println("usage : java -cp <>.jar zkString <topicfiletoread>");
System.exit(-1);
}
ZkClient zkClient = null;
ZkUtils zkUtils = null;
BufferedReader reader = null;
try {
String zookeeperHosts = args[0]; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181";
int sessionTimeOutInMs = 60 * 1000; // 15 secs
int connectionTimeOutInMs = 30 * 1000; // 30 secs
zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$);
zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false);
String file =args[1];
reader = new BufferedReader(new FileReader(file));
String line;
while((line = reader.readLine()) != null) {
String[] currentLine = line.trim().split(":");
Properties topicConfiguration = new Properties();
AdminUtils.createTopic(zkUtils, currentLine[0], Integer.valueOf(currentLine[1]), Integer.valueOf(currentLine[2]), topicConfiguration, RackAwareMode.Enforced$.MODULE$);
System.out.println("Topic :" + currentLine[0] + " Created with partition:" + currentLine[1] + " replicationFactor:" + currentLine[2]);
}
} catch (Exception ex) {
System.out.println("Failed topic Creation"+ex.getMessage());
ex.printStackTrace();
} finally {
if (zkClient != null) {
zkClient.close();
}
if(reader!= null){
try {
reader.close();
} catch (IOException e) {
//ignore
}
}
}
}
}
Last active
May 19, 2020 11:13
-
-
Save rajkrrsingh/33fb9a4041112dff2c24ec95d8c6cab1 to your computer and use it in GitHub Desktop.
create kafka bulk topic from admin client
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment