Skip to content

Instantly share code, notes, and snippets.

@kdrakon
Last active December 16, 2022 15:29
Show Gist options
  • Star 8 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kdrakon/618f1312f2b96d469492568f8d56e036 to your computer and use it in GitHub Desktop.
Save kdrakon/618f1312f2b96d469492568f8d56e036 to your computer and use it in GitHub Desktop.
Using Avro4s with Confluent Kafka Avro Serializer + Schema Registry
import java.util
import com.sksamuel.avro4s.RecordFormat
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.common.serialization.{Deserializer, Serde, Serializer}
object Avro4s {
implicit class CaseClassSerde(inner: Serde[GenericRecord]) {
def forCaseClass[T](implicit recordFormat: RecordFormat[T]): Serde[T] = {
val caseClassSerializer: Serializer[T] = new Serializer[T] {
override def serialize(topic: String, data: T): Array[Byte] = inner.serializer().serialize(topic, recordFormat.to(data))
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()
override def close(): Unit = ()
}
val caseClassDeserializer: Deserializer[T] = new Deserializer[T] {
override def deserialize(topic: String, data: Array[Byte]): T = recordFormat.from(inner.deserializer().deserialize(topic, data))
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()
override def close(): Unit = ()
}
new Serde[T] {
override def serializer(): Serializer[T] = caseClassSerializer
override def deserializer(): Deserializer[T] = caseClassDeserializer
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()
override def close(): Unit = ()
}
}
}
}
object Example {
import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
import scala.collection.JavaConverters._
import Avro4s.CaseClassSerde
val confluentGenericSerde = new GenericAvroSerde()
val config: java.util.Map[String, _] = Map(("schema.registry.url", "https://localhost:8081")).asJava
confluentGenericSerde.configure(config, false)
case class Foo(i: Int, s: String)
val confluentGenericSerdeWrapped = confluentGenericSerde.forCaseClass[Foo]
confluentGenericSerdeWrapped.serializer().serialize("foo", Foo(1, "Baz"))
}
@kdrakon
Copy link
Author

kdrakon commented Aug 14, 2018

In Kafka 2.0.0, Avro4s.scala can become as simple as this:

import com.sksamuel.avro4s.RecordFormat
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.scala.Serdes

object Avro4s {
  implicit class CaseClassSerde(inner: Serde[GenericRecord]) {
    def forCaseClass[T >: Null](implicit recordFormat: RecordFormat[T]): Serde[T] = {
      Serdes.fromFn(
        (topic, data) => inner.serializer().serialize(topic, recordFormat.to(data)),
        (topic, bytes) => Option(recordFormat.from(inner.deserializer().deserialize(topic, bytes)))
      )
    }
  }
}

@afsalthaj
Copy link

Interesting.
One thing that you could point out is, instead of implicit RecordFormat, form a RecordFormat explicitly by RecordFormat.format(generatedCaseClass.getSchema) instead of delegating that job to avro4s which might do things different to confluent. schema is accessble if you run the task avroScalaGenerateSpecific in sbt-avro-hugger.

@PedroGarci4
Copy link

which is the dependency for this lib? io.confluent.kafka.streams.serdes.avro.GenericAvroSerde

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment