Skip to content

Instantly share code, notes, and snippets.

@Daenyth
Created November 22, 2021 14:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Daenyth/e6118cca08ac56e8382b535d1d25676c to your computer and use it in GitHub Desktop.
Save Daenyth/e6118cca08ac56e8382b535d1d25676c to your computer and use it in GitHub Desktop.
fs2-kafka vulcan => avro Serde helper
import cats.effect.Sync
import cats.syntax.all._
import cats.effect.std.Dispatcher
import fs2.kafka.vulcan.{AvroSettings, avroDeserializer, avroSerializer}
import fs2.kafka.{Deserializer, Headers, Serializer}
import org.apache.kafka.common.serialization.{Serde, Serdes}
import vulcan.Codec
// Credit to Fede Fernández for the original CE2 version
// Note: this isn't under an open source license; it's for educational purposes only
object VulcanHelper {
def serde[F[_], A](
serializer: Serializer[F, A],
deserializer: Deserializer[F, A],
disp: Dispatcher[F]
): Serde[A] =
Serdes.serdeFrom(
new org.apache.kafka.common.serialization.Serializer[A] {
override def serialize(topic: String, data: A): Array[Byte] =
if (data == null) Array.empty
else
disp.unsafeRunSync(serializer.serialize(topic, Headers.empty, data))
},
new org.apache.kafka.common.serialization.Deserializer[A] {
override def deserialize(topic: String, data: Array[Byte]): A =
if (data == null || data.isEmpty) null.asInstanceOf[A]
else
disp.unsafeRunSync(deserializer.deserialize(topic, Headers.empty, data))
}
)
def keySerde[F[_]: Sync, A: Codec](
avroSettings: AvroSettings[F],
disp: Dispatcher[F]
): F[Serde[A]] =
(
avroSerializer[A].using(avroSettings).forKey,
avroDeserializer[A].using(avroSettings).forKey
).mapN(serde[F, A](_, _, disp))
def valueSerde[F[_]: Sync, A: Codec](
avroSettings: AvroSettings[F],
disp: Dispatcher[F]
): F[Serde[A]] =
(
avroSerializer[A].using(avroSettings).forValue,
avroDeserializer[A].using(avroSettings).forValue
).mapN(serde[F, A](_, _, disp))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment