Skip to content

Instantly share code, notes, and snippets.

@dacr
Last active April 2, 2023 10:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dacr/789efd4efa2ac92df4b327f7bfccd2d6 to your computer and use it in GitHub Desktop.
Save dacr/789efd4efa2ac92df4b327f7bfccd2d6 to your computer and use it in GitHub Desktop.
Kafka simple usage using java API / published by https://github.com/dacr/code-examples-manager #632dd41b-82c8-445c-b3bd-8f1d54280df0/54afddc723ec45a606589cf474d53c6f147cac9a
// summary : Kafka simple usage using java API
// keywords : scala, scalatest, kafka
// publish : gist
// authors : David Crosson
// license : Apache NON-AI License Version 2.0 (https://raw.githubusercontent.com/non-ai-licenses/non-ai-licenses/main/NON-AI-APACHE2)
// id : 632dd41b-82c8-445c-b3bd-8f1d54280df0
// created-on : 2020-01-10T16:35:02Z
// managed-by : https://github.com/dacr/code-examples-manager
// execution : scala ammonite script (http://ammonite.io/) - run as follow 'amm scriptname.sc'
import $ivy.`org.apache.kafka:kafka-clients:2.8.0`
import $ivy.`org.json4s::json4s-jackson:3.6.11`
import $ivy.`org.json4s::json4s-ext:3.6.11`
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.common.serialization.StringDeserializer
import java.time.Duration
import scala.jdk.CollectionConverters._
import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods.{parse, pretty, render}
implicit val formats = DefaultFormats.lossless
val consumer = {
import ConsumerConfig._
val props = new java.util.Properties()
props.setProperty(BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092")
props.setProperty(GROUP_ID_CONFIG, "default")
props.setProperty(AUTO_OFFSET_RESET_CONFIG, "earliest") // if first restart then rewind from the beginning
props.setProperty(ENABLE_AUTO_COMMIT_CONFIG, "true")
props.setProperty(AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
props.put(KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
props.put(VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
new KafkaConsumer[String, String](props)
}
consumer.subscribe(List("test").asJava)
while ( true ) {
val records = consumer.poll(Duration.ofMillis(100)).asScala
for (record <- records) {
printf("offset = %d, key = %s, value = %s%n", record.offset, record.key, record.value)
val json = parse(record.value)
println(pretty(render(json)))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment