Last active April 2, 2023 10:12
// summary : Kafka simple usage using java API
// keywords : scala, scalatest, kafka
// publish : gist
// authors : David Crosson
// license : Apache NON-AI License Version 2.0 (
// id : 632dd41b-82c8-445c-b3bd-8f1d54280df0
// created-on : 2020-01-10T16:35:02Z
// managed-by :
// execution : scala ammonite script ( - run as follow 'amm'
import $ivy.`org.apache.kafka:kafka-clients:2.8.0`
import $ivy.`org.json4s::json4s-jackson:3.6.11`
import $ivy.`org.json4s::json4s-ext:3.6.11`
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.common.serialization.StringDeserializer
import java.time.Duration
import scala.jdk.CollectionConverters._
import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods.{parse, pretty, render}
implicit val formats = DefaultFormats.lossless
val consumer = {
import ConsumerConfig._
val props = new java.util.Properties()
props.setProperty(BOOTSTRAP_SERVERS_CONFIG, "")
props.setProperty(GROUP_ID_CONFIG, "default")
props.setProperty(AUTO_OFFSET_RESET_CONFIG, "earliest") // if first restart then rewind from the beginning
props.setProperty(ENABLE_AUTO_COMMIT_CONFIG, "true")
props.setProperty(AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
props.put(KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
props.put(VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
new KafkaConsumer[String, String](props)
while ( true ) {
val records = consumer.poll(Duration.ofMillis(100)).asScala
for (record <- records) {
printf("offset = %d, key = %s, value = %s%n", record.offset, record.key, record.value)
val json = parse(record.value)
