Last active
January 22, 2024 20:10
-
-
Save mertant/e8ba52c3d2574c4e26cd46a67f5320d3 to your computer and use it in GitHub Desktop.
FS2 Kafka + ScalaPB example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* Dependencies: | |
Seq( | |
"co.fs2" %% "fs2-core" % "3.9.4", | |
"com.github.fd4s" %% "fs2-kafka" % "3.2.0", | |
"com.thesamet.scalapb" %% "compilerplugin" % "0.11.15", | |
"com.thesamet.scalapb" %% "scalapb-runtime" % "0.11.15", | |
) | |
*/ | |
import cats.effect.{IO, Resource} | |
import fs2.kafka.{KafkaProducer, ProducerSettings} | |
object ProtoKafkaExample { | |
/** Given a ScalaPB-generated message type (and companion) for both the key and value, create an FS2 Kafka producer. | |
*/ | |
def producerForPb[KeyPB <: scalapb.GeneratedMessage, ValuePB <: scalapb.GeneratedMessage]( | |
keyCompanion: scalapb.GeneratedMessageCompanion[KeyPB], | |
valueCompanion: scalapb.GeneratedMessageCompanion[ValuePB], | |
): Resource[IO, KafkaProducer[IO, KeyPB, ValuePB]] = { | |
val keySerializer = serializerForPb(keyCompanion) | |
val valueSerializer = serializerForPb(valueCompanion) | |
val settings: ProducerSettings[IO, KeyPB, ValuePB] = fs2.kafka.ProducerSettings(keySerializer, valueSerializer) | |
fs2.kafka.KafkaProducer.resource(settings) | |
} | |
/** Given a ScalaPB-generated message type (and companion), create an FS2 Kafka serializer (bytes) | |
*/ | |
def serializerForPb[PB <: scalapb.GeneratedMessage]( | |
companion: scalapb.GeneratedMessageCompanion[PB], | |
): fs2.kafka.Serializer[IO, PB] = { | |
fs2.kafka.Serializer.apply[IO].contramap { pb => companion.toByteArray(pb) } | |
// Alternatively, we can serialize (but not deserialize) without the companion object: | |
// fs2.kafka.Serializer.apply[IO].contramap { pb => pb.toByteArray } | |
} | |
def deserializerForPb[PB <: scalapb.GeneratedMessage]( | |
companion: scalapb.GeneratedMessageCompanion[PB], | |
): fs2.kafka.Deserializer[IO, PB] = { | |
fs2.kafka.Deserializer.apply[IO].map { bytes => companion.parseFrom(bytes) } | |
} | |
/** Given a ScalaPB-generated message type (and companion) for values, create an FS2 Kafka producer with an optional String | |
* key. | |
*/ | |
def producerForStringKeyPb[ValuePB <: scalapb.GeneratedMessage]( | |
valueCompanion: scalapb.GeneratedMessageCompanion[ValuePB], | |
): Resource[IO, KafkaProducer.PartitionsFor[IO, Option[String], ValuePB]] = { | |
val keySerializer = fs2.kafka.Serializer.string[IO].option | |
val valueSerializer = serializerForPb(valueCompanion) | |
val settings: ProducerSettings[IO, Option[String], ValuePB] = fs2.kafka.ProducerSettings(keySerializer, valueSerializer) | |
fs2.kafka.KafkaProducer.resource(settings) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment