Skip to content

Instantly share code, notes, and snippets.

@ethrbunny
Created August 31, 2016 11:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ethrbunny/ab24056e25667a5a9e082bdf580a21ad to your computer and use it in GitHub Desktop.
Save ethrbunny/ab24056e25667a5a9e082bdf580a21ad to your computer and use it in GitHub Desktop.
main class for kafka demo
public class Main {
public static String BROKER_IP = "<some ip>:9092";
public static String ZOOKEEPER_IP = "<some ip>:2181";
public static String TOPIC = "mytopic";
public static void main(String[] args) {
final Logger LOGGER = LoggerFactory.getLogger(Main.class);
Properties config = new Properties();
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP);
config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka_test1" );
config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000");
StreamsConfig streamsConfig = new StreamsConfig(config);
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> lines = kStreamBuilder.stream(TOPIC);
lines.process(new BPS2());
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);
kafkaStreams.start();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment