Skip to content

Instantly share code, notes, and snippets.

@shankarshastri
Created April 28, 2020 14:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save shankarshastri/c3fad5fb8ff109dd5e4b327622d05783 to your computer and use it in GitHub Desktop.
Save shankarshastri/c3fad5fb8ff109dd5e4b327622d05783 to your computer and use it in GitHub Desktop.
KafkaProducerConsumerAvro
import java.io.ByteArrayOutputStream
import java.time.Duration
import java.util.concurrent.Executors
import java.util.{Properties, Timer, TimerTask, UUID}
import Models.{Message, SystemMessage, UserMessage, msgAvroSchema}
import com.sksamuel.avro4s.{AvroInputStream, AvroOutputStream, AvroSchema}
import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer._
import org.apache.kafka.common.errors.WakeupException
import org.apache.kafka.common.serialization._
import scala.concurrent._
import scala.util.Try
object SAKPC extends App {
object Models {
sealed trait Message
case class UserMessage(name: String) extends Message
case class SystemMessage(name: String, error: Option[String], newField: Option[String] = None,
supField: Option[String] = None) extends Message
val msgAvroSchema = AvroSchema[Message]
}
implicit class KafkaUtil[K,V](kP: KafkaProducer[K,V]) {
def sendAsync(record: ProducerRecord[K,V]): Future[RecordMetadata] = {
val p = Promise[RecordMetadata]()
kP.send(record, new Callback {
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
if(exception == null) {
p.success(metadata)
} else {
exception.printStackTrace
p.failure(exception)
}
}
})
p.future
}
}
implicit class KafkaUtilForConsumer[K,V](kC: KafkaConsumer[K,V]) {
import scala.jdk.CollectionConverters._
private val timer = new Timer()
def scheduledPoll(pollLongMillis: Long, callback: Iterable[ConsumerRecord[K,V]] => Try[Unit],
exceptionHandler: Exception => Try[Unit])
(implicit ec: ExecutionContext): Unit = {
val timerTask = new TimerTask {
override def run(): Unit = {
Try {
val records = kC.poll(Duration.ofMillis(pollLongMillis))
callback(records.asScala)
}.flatten.recoverWith {
case ex: WakeupException => Try(kC.close())
case ex: Exception => exceptionHandler(ex)
}
}
}
timer.schedule(timerTask, pollLongMillis, pollLongMillis)
}
}
val topicName = "sample-hello-world-avro"
val producerProps = new Properties()
producerProps.put("bootstrap.servers", "localhost:9092")
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer].getName)
producerProps.put("ack", "all")
producerProps.put("timeout.ms", "5000")
producerProps.put("batch.size", "10")
producerProps.put("linger.ms", "1000")
producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy")
val consumerProps = new Properties()
consumerProps.put("bootstrap.servers", "localhost:9092")
consumerProps.put("group.id", "consumer-tutorial")
consumerProps.put("key.deserializer", classOf[StringDeserializer].getName)
consumerProps.put("value.deserializer", classOf[ByteArrayDeserializer].getName)
implicit val producerExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(2))
val producer = new KafkaProducer[String, Array[Byte]](producerProps)
val consumer = new KafkaConsumer[String, Array[Byte]](consumerProps)
consumer.subscribe(java.util.Collections.singletonList(topicName))
val consumerExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(2))
val f = for(i <- 1 to 100) yield {
val key = UUID.randomUUID()
val p = Promise[RecordMetadata]()
val data = if(scala.util.Random.between(1, 100) % 2 == 0) UserMessage(s"Hello ${key}") else SystemMessage(s"Hello System Message ${key}", None, Some("Super"), Some("ResultValue"))
val b = new ByteArrayOutputStream()
val k = AvroOutputStream.json[Message].to(b).build(msgAvroSchema)
k.write(data)
k.flush()
producer.sendAsync(new ProducerRecord[String, Array[Byte]](topicName, key.toString, b.toByteArray))
}
val producerFut = Future.sequence(f)
val consumeFut = consumer.scheduledPoll(10000, (it) => {
Try {
println(it.toList)
}
}, (ex) => {
Try(())
})
Await.result(producerFut, scala.concurrent.duration.Duration.Inf)
scala.sys.addShutdownHook {
println("Running shutdown hook")
producer.flush()
consumer.wakeup()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment