Skip to content

Instantly share code, notes, and snippets.

@Vilkina
Created September 15, 2020 08:37
Show Gist options
  • Save Vilkina/314a1ebedf35da385c08379f70ee6d57 to your computer and use it in GitHub Desktop.
Save Vilkina/314a1ebedf35da385c08379f70ee6d57 to your computer and use it in GitHub Desktop.
object Test {
import zio._
import zio.duration._
import zio.kafka.consumer._
val settings: ConsumerSettings =
ConsumerSettings(List("localhost:9092"))
.withGroupId("group")
.withClientId("client")
.withCloseTimeout(30.seconds)
import zio._
import zio.console._
import zio.kafka.consumer._
import zio.kafka.serde._
val subscription: Subscription = Subscription.topics("topic")
val readKafka: RIO[Console with Blocking with Clock, Unit] =
Consumer.consumeWith(settings, subscription, Serde.string, Serde.string) {
case (key, value) =>
putStrLn(s"Received message ${key}: ${value}")
// Perform an effect with the received message
}
private val readKafka2: RIO[ZEnv with Consumer, Unit] =
zio.kafka.consumer.Consumer.withConsumerService(
_.consumeWith(subscription, Serde.string, Serde.string)({
case (key, value) =>
putStrLn(s"Received message ${key}: ${value}")
})
)
private val service: ZManaged[Clock with Blocking, Throwable, Consumer.Service] =
Consumer.make(settings)
private val layer: ZLayer[Clock with Blocking, Throwable, Consumer] = ZLayer(
service
)
val readKafkaPrg: URIO[ZEnv, ExitCode] =
readKafka2.provideCustomLayer(layer).exitCode
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment