You can test this gist by simple copying the files into an empty folder and type:
sbt run
You also should have running Kafka. Follow the instructions on https://kafka.apache.org/quickstart
Pay attention, the topic should be quickstart-events
.
You can test this gist by simple copying the files into an empty folder and type:
sbt run
You also should have running Kafka. Follow the instructions on https://kafka.apache.org/quickstart
Pay attention, the topic should be quickstart-events
.
name := "talknator" | |
version := "0.1" | |
scalaVersion := "2.13.5" | |
libraryDependencies ++= Seq("org.apache.kafka" % "kafka-clients" % "2.7.0") |
import org.apache.kafka.clients.consumer.KafkaConsumer | |
import org.apache.kafka.clients.CommonClientConfigs | |
import org.apache.kafka.common.serialization.StringDeserializer | |
import scala.jdk.CollectionConverters._ | |
import java.util.Properties | |
object KafkaReader extends App { | |
def getProperties: Properties = { | |
val properties = new Properties() | |
properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") | |
properties.setProperty(CommonClientConfigs.GROUP_ID_CONFIG, "talknator") | |
properties | |
} | |
def getConsumer(properties: Properties): KafkaConsumer[String, String] = { | |
val keyDeserializer = new StringDeserializer | |
val valueDeserializer = new StringDeserializer | |
new KafkaConsumer(properties, keyDeserializer, valueDeserializer) | |
} | |
def consume(consumer: KafkaConsumer[String, String]): Unit = { | |
val records = consumer.poll(java.time.Duration.ofMillis(500)) | |
records.forEach { record => | |
println(s"Message: ${record.value}. (Partition: ${record.partition}, Offset: ${record.offset})") | |
} | |
consumer.commitAsync() | |
} | |
def run(): Unit = { | |
val consumer = getConsumer(getProperties) | |
consumer.subscribe(Set("quickstart-events").asJava) | |
println("Starting consumer") | |
while (true) { | |
consume(consumer) | |
} | |
println("Finishing consumer") | |
consumer.close() | |
} | |
run() | |
} |