Skip to content

Instantly share code, notes, and snippets.

@robvadai
Last active October 20, 2017 20:26
Show Gist options
  • Save robvadai/4fb8e4bf2210b18be33ed107acacdcc4 to your computer and use it in GitHub Desktop.
Save robvadai/4fb8e4bf2210b18be33ed107acacdcc4 to your computer and use it in GitHub Desktop.
An example to start Kafka Streams
val kafkaStreamsConfig: Properties = new Properties()
kafkaStreamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, consumerGroup)
kafkaStreamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers)
// Assuming streamBuilder is already defined with the transformation pipeline
val kafkaStreams = new KafkaStreams(streamBuilder, kafkaStreamsConfig)
Try {
Logger.info("Starting Kafka Streams...")
kafkaStreams.start()
}.recover {
case e: Throwable =>
Logger.error("Encountered an unrecoverable error, stopping application.", e)
kafkaStreams.close()
sys.exit(1)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment