Skip to content

Instantly share code, notes, and snippets.

@dosht
Created July 7, 2020 03:34
Show Gist options
  • Save dosht/79da2bc311c7d17bb9de3a8f0d5f9c78 to your computer and use it in GitHub Desktop.
Save dosht/79da2bc311c7d17bb9de3a8f0d5f9c78 to your computer and use it in GitHub Desktop.
import cats.effect.{ConcurrentEffect, ExitCode, IO, IOApp}
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
object ConsumerApplication extends IOApp {
implicit private val ec: ExecutionContextExecutor = ExecutionContext.global
private val cs = IO.contextShift(ec)
implicit private val concurrentEffect: ConcurrentEffect[IO] = IO.ioConcurrentEffect(cs)
val config = ConsumerConfig("localhost:9092", "test-topic", "consumer-group-1")
override def run(args: List[String]): IO[ExitCode] = {
val applicationResource = for {
kafkaContext <- KafkaContext.resource(cs)
consumer <- Consumer.resource(new MessageHandler, config, kafkaContext)
} yield consumer
applicationResource.use(consumer => consumer.subscribe *> consumer.consume).as(ExitCode.Success)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment