Skip to content

Instantly share code, notes, and snippets.

@romulojales
Created February 23, 2021 15:56
Show Gist options
  • Save romulojales/59076811c63fbe42a93b88cf65d17981 to your computer and use it in GitHub Desktop.
Save romulojales/59076811c63fbe42a93b88cf65d17981 to your computer and use it in GitHub Desktop.
Simple scala kafka consumer

You can test this gist by simple copying the files into an empty folder and type:

sbt run

You also should have running Kafka. Follow the instructions on https://kafka.apache.org/quickstart

Pay attention, the topic should be quickstart-events.

name := "talknator"
version := "0.1"
scalaVersion := "2.13.5"
libraryDependencies ++= Seq("org.apache.kafka" % "kafka-clients" % "2.7.0")
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.serialization.StringDeserializer
import scala.jdk.CollectionConverters._
import java.util.Properties
object KafkaReader extends App {
def getProperties: Properties = {
val properties = new Properties()
properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
properties.setProperty(CommonClientConfigs.GROUP_ID_CONFIG, "talknator")
properties
}
def getConsumer(properties: Properties): KafkaConsumer[String, String] = {
val keyDeserializer = new StringDeserializer
val valueDeserializer = new StringDeserializer
new KafkaConsumer(properties, keyDeserializer, valueDeserializer)
}
def consume(consumer: KafkaConsumer[String, String]): Unit = {
val records = consumer.poll(java.time.Duration.ofMillis(500))
records.forEach { record =>
println(s"Message: ${record.value}. (Partition: ${record.partition}, Offset: ${record.offset})")
}
consumer.commitAsync()
}
def run(): Unit = {
val consumer = getConsumer(getProperties)
consumer.subscribe(Set("quickstart-events").asJava)
println("Starting consumer")
while (true) {
consume(consumer)
}
println("Finishing consumer")
consumer.close()
}
run()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment