Skip to content

Instantly share code, notes, and snippets.

@kailuowang
Last active November 4, 2019 13:55
Show Gist options
  • Save kailuowang/fc9974f66113e2362dac to your computer and use it in GitHub Desktop.
Save kailuowang/fc9974f66113e2362dac to your computer and use it in GitHub Desktop.
Avro deserializer for Flink's Data Stream API Kafka Source
import org.apache.avro.io.DatumReader
import org.apache.avro.io.DecoderFactory
import org.apache.avro.specific.SpecificDatumReader
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor
class AvroDeserializationSchema[T <: SpecificRecordBase: ClassTag](private val baselineMessage: () ⇒ T) extends DeserializationSchema[T] {
override def deserialize(message: Array[Byte]): T = {
val bm = baselineMessage()
val dw: DatumReader[T] = new SpecificDatumReader(bm.getSchema)
dw.read(bm, DecoderFactory.get.binaryDecoder(message, null))
}
override def isEndOfStream(nextElement: T): Boolean = false
override def getProducedType(): TypeInformation[T] = {
TypeExtractor.getForClass(implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]])
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment