Skip to content

Instantly share code, notes, and snippets.

@mertant
Last active January 22, 2024 20:10
Show Gist options
  • Save mertant/e8ba52c3d2574c4e26cd46a67f5320d3 to your computer and use it in GitHub Desktop.
Save mertant/e8ba52c3d2574c4e26cd46a67f5320d3 to your computer and use it in GitHub Desktop.
FS2 Kafka + ScalaPB example
/* Dependencies:
Seq(
"co.fs2" %% "fs2-core" % "3.9.4",
"com.github.fd4s" %% "fs2-kafka" % "3.2.0",
"com.thesamet.scalapb" %% "compilerplugin" % "0.11.15",
"com.thesamet.scalapb" %% "scalapb-runtime" % "0.11.15",
)
*/
import cats.effect.{IO, Resource}
import fs2.kafka.{KafkaProducer, ProducerSettings}
object ProtoKafkaExample {
/** Given a ScalaPB-generated message type (and companion) for both the key and value, create an FS2 Kafka producer.
*/
def producerForPb[KeyPB <: scalapb.GeneratedMessage, ValuePB <: scalapb.GeneratedMessage](
keyCompanion: scalapb.GeneratedMessageCompanion[KeyPB],
valueCompanion: scalapb.GeneratedMessageCompanion[ValuePB],
): Resource[IO, KafkaProducer[IO, KeyPB, ValuePB]] = {
val keySerializer = serializerForPb(keyCompanion)
val valueSerializer = serializerForPb(valueCompanion)
val settings: ProducerSettings[IO, KeyPB, ValuePB] = fs2.kafka.ProducerSettings(keySerializer, valueSerializer)
fs2.kafka.KafkaProducer.resource(settings)
}
/** Given a ScalaPB-generated message type (and companion), create an FS2 Kafka serializer (bytes)
*/
def serializerForPb[PB <: scalapb.GeneratedMessage](
companion: scalapb.GeneratedMessageCompanion[PB],
): fs2.kafka.Serializer[IO, PB] = {
fs2.kafka.Serializer.apply[IO].contramap { pb => companion.toByteArray(pb) }
// Alternatively, we can serialize (but not deserialize) without the companion object:
// fs2.kafka.Serializer.apply[IO].contramap { pb => pb.toByteArray }
}
def deserializerForPb[PB <: scalapb.GeneratedMessage](
companion: scalapb.GeneratedMessageCompanion[PB],
): fs2.kafka.Deserializer[IO, PB] = {
fs2.kafka.Deserializer.apply[IO].map { bytes => companion.parseFrom(bytes) }
}
/** Given a ScalaPB-generated message type (and companion) for values, create an FS2 Kafka producer with an optional String
* key.
*/
def producerForStringKeyPb[ValuePB <: scalapb.GeneratedMessage](
valueCompanion: scalapb.GeneratedMessageCompanion[ValuePB],
): Resource[IO, KafkaProducer.PartitionsFor[IO, Option[String], ValuePB]] = {
val keySerializer = fs2.kafka.Serializer.string[IO].option
val valueSerializer = serializerForPb(valueCompanion)
val settings: ProducerSettings[IO, Option[String], ValuePB] = fs2.kafka.ProducerSettings(keySerializer, valueSerializer)
fs2.kafka.KafkaProducer.resource(settings)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment