Skip to content

Instantly share code, notes, and snippets.

@jeroenr
Created February 20, 2018 09:55
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jeroenr/8b0cc0a4ce3b4d521de28267867bc003 to your computer and use it in GitHub Desktop.
Save jeroenr/8b0cc0a4ce3b4d521de28267867bc003 to your computer and use it in GitHub Desktop.
Example of case class serializer and deserializer for Avro
class CaseClassDeserializer[CC](isKey: Boolean)(implicit format: RecordFormat[CC]) extends Deserializer[CC] with Logging {
private val deserializer = new KafkaAvroDeserializer(null, Map(
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> Config.tracktivity.kafka.connect.`schema-registry-url`,
KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG -> "false"
).asJava)
override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = {}
override def close(): Unit = deserializer.close()
override def deserialize(topic: String, data: Array[Byte]): CC = {
if (data == null) null.asInstanceOf[CC]
else {
val record = deserializer.deserialize(topic, data).asInstanceOf[GenericRecord]
if (record == null || format == null) logger.info(s"DESER ${record} from topic ${topic} using format ${format}. Based on data ${data}")
format.from(record)
}
}
}
class CaseClassSerializer[CC](isKey: Boolean)(implicit format: RecordFormat[CC]) extends Serializer[CC] {
private val serializer = new KafkaAvroSerializer(null, Map(
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> Config.tracktivity.kafka.connect.`schema-registry-url`
).asJava)
override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = {}
override def close(): Unit = serializer.close()
override def serialize(topic: String, data: CC): Array[Byte] = {
val record = format.to(data)
serializer.serialize(topic, record)
}
}
class CaseClassSerde[CC](isKey: Boolean)(implicit format: RecordFormat[CC]) extends Serde[CC] {
override def deserializer(): Deserializer[CC] = new CaseClassDeserializer(isKey)
override def serializer(): Serializer[CC] = new CaseClassSerializer(isKey)
override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = {}
override def close(): Unit = {}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment