Skip to content

Instantly share code, notes, and snippets.

@gupta-himanshu
Created July 22, 2017 09:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gupta-himanshu/90264f727d0a72c60c31ae34325df273 to your computer and use it in GitHub Desktop.
Save gupta-himanshu/90264f727d0a72c60c31ae34325df273 to your computer and use it in GitHub Desktop.
package com.knoldus.kafka.examples
import java.util.Properties
import org.apache.kafka.common.serialization._
import org.apache.kafka.streams._
import org.apache.kafka.streams.kstream.KStreamBuilder
/**
* Copyright Knoldus Software LLP, 2017. All rights reserved.
*/
object StreamApplication {
def main(args: Array[String]): Unit = {
val config = {
val properties = new Properties()
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-application")
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass)
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
properties
}
val builder = new KStreamBuilder()
val sourceStream = builder.stream("SourceTopic")
sourceStream.to("SinkTopic")
val streams = new KafkaStreams(builder, config)
streams.start()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment