Skip to content

Instantly share code, notes, and snippets.

@samuelorji
Last active July 20, 2018 00:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save samuelorji/462a9c205a9d18ed5745242a1bf5aaf3 to your computer and use it in GitHub Desktop.
Save samuelorji/462a9c205a9d18ed5745242a1bf5aaf3 to your computer and use it in GitHub Desktop.
Simple conusmer code for kafka only compatible with kafka 0.10.x
package kafka
import java.util.{Collections, Properties}
import java.util
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.TopicPartition
import org.apache.log4j.Logger
import scala.collection.JavaConverters._
object consuming extends App {
val properties = new Properties()
properties.setProperty("bootstrap.servers","localhost:9092")
properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("group.id","test")
properties.setProperty("enable.auto.commit","true")
properties.setProperty("auto.commit.interval.ms","1000")
properties.setProperty("auto.offset.reset","none")
// properties.put("zookeeper-connect","localhost:2181")
// properties.setProperty("partition.assignment.strategy","org.apache.kafka.clients.consumer.RangeAssignor")
val TOPIC = "test"
val consumer = new KafkaConsumer[String,String](properties)
consumer.subscribe(util.Collections.singletonList(TOPIC))
while(true) {
val records = consumer.poll(100)
for (record <- records.asScala) {
println(s"partition : ${record.partition()} --- key : ${record.key()}" +
s" ---value : ${record.value()} --- topic ${record.topic()} --- timestamp : ${record.timestamp()}")
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment