Skip to content

Instantly share code, notes, and snippets.

@marquesds
Created March 2, 2020 17:55
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save marquesds/880c6ed08bee14122523da637513a690 to your computer and use it in GitHub Desktop.
Save marquesds/880c6ed08bee14122523da637513a690 to your computer and use it in GitHub Desktop.
import io.circe.{Decoder, Encoder}
import io.circe.parser.decode
import io.circe.syntax._
import org.apache.flink.api.common.serialization.{DeserializationSchema, SerializationSchema}
import org.apache.flink.api.java.typeutils.TypeExtractor
import scala.reflect.ClassTag
/* Generic Flink schema to encode/decode json with Circe */
case class FlinkCirceSchema[A <: AnyRef : ClassTag]()(implicit decoder: Decoder[A], encoder: Encoder[A])
extends SerializationSchema[A]
with DeserializationSchema[A] {
import org.apache.flink.api.common.typeinfo.TypeInformation
override def deserialize(bytes: Array[Byte]): A = decode[A](new String(bytes)) match {
case Right(value) => value
case Left(_) => null.asInstanceOf[A]
}
override def serialize(element: A): Array[Byte] = element.asJson.noSpaces.getBytes
override def isEndOfStream(nextElement: A): Boolean = false
override def getProducedType: TypeInformation[A] = {
import scala.reflect._
TypeExtractor.getForClass(classTag[A].runtimeClass.asInstanceOf[Class[A]])
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment