Skip to content

Instantly share code, notes, and snippets.

@soujiro32167
Created December 2, 2022 21:21
Show Gist options
  • Save soujiro32167/4839f66d5ff27a5b446800fcea71b609 to your computer and use it in GitHub Desktop.
Save soujiro32167/4839f66d5ff27a5b446800fcea71b609 to your computer and use it in GitHub Desktop.
Multi-topic sealed trait kafka producer
import _root_.vulcan.Codec
import _root_.vulcan.generic.*
import foo.Events.{E1, E2}
import fs2.kafka.*
import fs2.kafka.vulcan.{AvroSettings, SchemaRegistryClientSettings, avroSerializer}
import zio.interop.catz.*
import zio.interop.catz.implicits.rts
import zio.{Scope, Task, ZIO, ZIOAppDefault}
object foo extends ZIOAppDefault {
sealed trait Events
object Events {
case class E1(s: String) extends Events
case class E2(i: Int) extends Events
implicit val e1Codec: Codec[E1] = Codec.derive
implicit val e2Codec: Codec[E2] = Codec.derive
val derivedCodec: Codec[Events] = Codec.derive
implicit val ecodec: Codec[Events] = Codec.instance(
derivedCodec.schema,
{
case e: E1 => e1Codec.encode(e)
case e: E2 => e2Codec.encode(e)
},
(a, s) => derivedCodec.decode(a, s)
)
}
val avroSettings: AvroSettings[Task] =
AvroSettings {
SchemaRegistryClientSettings[Task]("http://localhost:8081")
}
def serializer[A: Codec](settings: AvroSettings[Task]): Task[Serializer[Task, A]] =
settings.createAvroSerializer(false, None).map {
case (serializer, _) =>
Serializer.instance { (topic, _, a: A) =>
Codec[A].encode(a) match {
case Left(e) => ZIO.fail(e.throwable)
case Right(v) => ZIO.attempt(serializer.serialize(topic, v))
}
}
}
val producer: ZIO[Scope, Throwable, KafkaProducer[Task, String, Events]] =
serializer[Events](avroSettings).flatMap(s =>
KafkaProducer.resource(
ProducerSettings[Task, String, Events](
keySerializer = avroSerializer[String].using(avroSettings),
valueSerializer = s
).withBootstrapServers("localhost:9092")
).toScopedZIO <* ZIO.log("initializing producer")
)
override def run = {
for {
p <- producer
_ <- p.produce(ProducerRecords.one(ProducerRecord("e1_topic", "k1", E1("something"))))
_ <- p.produce(ProducerRecords.one(ProducerRecord("e2_topic", "k2", E2(1))))
} yield ()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment