Skip to content

Instantly share code, notes, and snippets.

@koushikmln
Created May 19, 2018 08:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save koushikmln/f6ee5cface5f1f13e52c01946a357872 to your computer and use it in GitHub Desktop.
Save koushikmln/f6ee5cface5f1f13e52c01946a357872 to your computer and use it in GitHub Desktop.
Kafka Consumer Example using Scala
import java.util.{Collections, Properties}
import com.typesafe.config.ConfigFactory
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import scala.collection.JavaConversions._
object KafkaConsumerExample {
def main(args: Array[String]): Unit = {
val conf = ConfigFactory.load
val envProps = conf.getConfig(args(0))
val props = new Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, envProps.getString("bootstrap.server"))
props.put(ConsumerConfig.GROUP_ID_CONFIG, "1")
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(Collections.singletonList("Kafka-Testing"))
while(true){
val records = consumer.poll(500)
for (record <- records.iterator()) {
println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset())
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment