Skip to content

Instantly share code, notes, and snippets.

@dejan
Last active July 5, 2017 19:04
Show Gist options
  • Save dejan/5028b29d659fc1034fc28621bdfa96a6 to your computer and use it in GitHub Desktop.
Save dejan/5028b29d659fc1034fc28621bdfa96a6 to your computer and use it in GitHub Desktop.
SimpleConsumer
name := "Example"
version := "1.0"
scalaVersion := "2.12.2"
lazy val root = (project in file(".")).
settings(
inThisBuild(List(
organization := "com.example",
scalaVersion := "2.12.1",
version := "0.1.0-SNAPSHOT"
)),
name := "tailtopic",
resolvers ++= Seq(
"Confluent" at "http://packages.confluent.io/maven/"
),
libraryDependencies ++= Seq(
"org.apache.kafka" %% "kafka" % "0.10.2.1",
"org.apache.kafka" % "kafka-clients" % "0.10.2.1",
"io.confluent" % "kafka-avro-serializer" % "3.2.1"
)
)
package tailtopic
import org.apache.kafka.clients.consumer.KafkaConsumer
import java.util.Properties
import scala.collection.JavaConverters._
import org.apache.avro.generic.GenericRecord
object Main extends App {
val consumerProperties = {
val props = new Properties()
props.put("group.id", "banner-consumer")
props.put("enable.auto.commit", "false")
props.put("auto.offset.reset", "earliest")
props.put("schema.registry.url", "http://kafka001:8081")
props.put("bootstrap.servers", "kafka001:9092")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer")
props
}
val consumer = new KafkaConsumer[String, String](consumerProperties)
consumer.subscribe(
Seq(
"client_request_v1",
"client_response_v1"
).asJava
)
sys.addShutdownHook {
println("Shuting down...")
consumer.close()
}
while (true) {
for (record <- consumer.poll(1000).asScala){
println(record.value.asInstanceOf[GenericRecord].toString)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment