Skip to content

Instantly share code, notes, and snippets.

@rajkrrsingh
Last active May 19, 2020 11:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save rajkrrsingh/33fb9a4041112dff2c24ec95d8c6cab1 to your computer and use it in GitHub Desktop.
Save rajkrrsingh/33fb9a4041112dff2c24ec95d8c6cab1 to your computer and use it in GitHub Desktop.
create kafka bulk topic from admin client

import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.Properties;

/**
 * KafkaBulkTopicCreate - to read topic configs from the file and do bulk create
 * the source file expected to have input written in following format
 * topicName:partitionCount:replicationFactor
 * e.g.
 * testtopic:3:3
 * testtopic1:3:3
 *
 */
public class KafkaBulkTopicCreate
{
    public static void main( String[] args ) {
        if(args.length<2){
            System.out.println("usage : java -cp <>.jar zkString <topicfiletoread>");
            System.exit(-1);
        }

        ZkClient zkClient = null;
        ZkUtils zkUtils = null;
        BufferedReader reader = null;
        try {
            String zookeeperHosts = args[0]; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181";
            int sessionTimeOutInMs = 60 * 1000; // 15 secs
            int connectionTimeOutInMs = 30 * 1000; // 30 secs

            zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$);
            zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false);
            String file =args[1];

            reader = new BufferedReader(new FileReader(file));
            String line;
            while((line = reader.readLine()) != null) {
                String[] currentLine = line.trim().split(":");
                Properties topicConfiguration = new Properties();
                AdminUtils.createTopic(zkUtils, currentLine[0], Integer.valueOf(currentLine[1]), Integer.valueOf(currentLine[2]), topicConfiguration, RackAwareMode.Enforced$.MODULE$);
                System.out.println("Topic :" + currentLine[0] + " Created with partition:" + currentLine[1] + " replicationFactor:" + currentLine[2]);
            }
        } catch (Exception ex) {
            System.out.println("Failed topic Creation"+ex.getMessage());
            ex.printStackTrace();
        } finally {
            if (zkClient != null) {
                zkClient.close();
            }
            if(reader!= null){
                try {
                    reader.close();
                } catch (IOException e) {
                    //ignore
                }
            }

        }
    }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment