Skip to content

Instantly share code, notes, and snippets.

@romanskie
Last active June 16, 2020 07:50
Show Gist options
  • Save romanskie/8833755de7ade22599e1c748c96fad11 to your computer and use it in GitHub Desktop.
Save romanskie/8833755de7ade22599e1c748c96fad11 to your computer and use it in GitHub Desktop.
A sample Kafka JSON SerDe written in Scala by using the Circe JSON library.
import java.nio.ByteBuffer
import java.util
import io.circe.parser._
import io.circe.syntax._
import io.circe.{Decoder, Encoder, _}
import org.apache.kafka.common.errors.SerializationException
import org.apache.kafka.common.serialization.{Deserializer, Serde, Serializer}
import scala.util.Try
final class KafkaJsonSerde[T >: Null: Encoder: Decoder] extends Serde[T] with SerdeDefaults {
override val serializer: Serializer[T] = new Serializer[T] with SerdeDefaults {
override def serialize(topic: String, t: T): Array[Byte] =
Option(t).map { someT =>
Try {
val byteBuffer: ByteBuffer = Printer.noSpaces.prettyByteBuffer(someT.asJson)
val size: Int = byteBuffer.remaining
val bytesArray: Array[Byte] = new Array[Byte](size)
byteBuffer.get(bytesArray, 0, bytesArray.length)
bytesArray
}.fold(
ex => throw new SerializationException("Error serializing JSON message", ex),
someBytes => someBytes
)
}.orNull
}
override val deserializer: Deserializer[T] = new Deserializer[T] with SerdeDefaults {
override def deserialize(topic: String, bytes: Array[Byte]): T =
Option(bytes).map { someBytes =>
val s = new String(someBytes)
decode[T](s).toTry
.fold(
ex => throw new SerializationException("Error deserializing JSON message", ex),
someT => someT
)
}.orNull
}
}
object KafkaJsonSerde {
def apply[T >: Null: Encoder: Decoder](): KafkaJsonSerde[T] = new KafkaJsonSerde[T]()
}
trait SerdeDefaults {
def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()
def close(): Unit = ()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment