Skip to content

Instantly share code, notes, and snippets.

@dosht
Created July 7, 2020 03:33
Show Gist options
  • Save dosht/24004921baf79fcb9a62a1b7a7a26eb5 to your computer and use it in GitHub Desktop.
Save dosht/24004921baf79fcb9a62a1b7a7a26eb5 to your computer and use it in GitHub Desktop.
import java.time.Duration
import java.util.{Collections, Properties}
import cats.effect.{IO, Resource, Timer}
import org.apache.kafka.clients.consumer.{KafkaConsumer, ConsumerConfig => KafkaConsumerConfig}
import org.apache.kafka.common.serialization.StringDeserializer
import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext
case class ConsumerConfig(servers: String, topic: String, consumerGroup: String)
class Consumer(messageHandler: MessageHandler, config: ConsumerConfig, kafkaContext: KafkaContext) {
implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global)
val props = new Properties()
props.put(KafkaConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.servers)
props.put(KafkaConsumerConfig.GROUP_ID_CONFIG, config.consumerGroup)
private lazy val kafkaConsumer: KafkaConsumer[String, Array[Byte]] = {
new KafkaConsumer[String, Array[Byte]](props, new StringDeserializer, new StringDeserializer)
}
def close: IO[Unit] = kafkaContext ~> kafkaConsumer.close()
def subscribe(): IO[Unit] = kafkaContext ~> kafkaConsumer.subscribe(Collections.singletonList(config.topic))
def consume: IO[Unit] = for {
messages <- kafkaContext ~> kafkaConsumer.poll(Duration.ofMillis(1000)).iterator().asScala.toList
_ <- messageHandler.handleMessage(messages)
_ <- consume
} yield ()
}
object Consumer {
def resource(messageHandler: MessageHandler, config: ConsumerConfig, context: KafkaContext): Resource[IO, Consumer] =
Resource.make(IO(new Consumer(messageHandler, config, context)))(_.close)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment