Skip to content

Instantly share code, notes, and snippets.

@jeroenr
Last active September 16, 2019 15:59
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jeroenr/2895de32accd440c2558261a49952cab to your computer and use it in GitHub Desktop.
Save jeroenr/2895de32accd440c2558261a49952cab to your computer and use it in GitHub Desktop.
Json serializer / deserializer implementation for Kafka Streams based on Spray Json
sealed trait ApiModel
case class ClickData(
cid: String, // GA client id
timestamp: Option[Long]
) extends ApiModel
object ApiModel extends DefaultJsonProtocol {
implicit val ClickDataFormat = jsonFormat2(ClickData)
}
// Using kafka streams API
builder.stream("clicks", Consumed.`with`(new StringSerde, new JsonSerde[ClickData]))
import akka.util.ByteString
import org.apache.kafka.common.serialization.{Deserializer, Serde, Serializer}
import spray.json._
/**
* Created by jero on 24/11/17.
*/
class JsonSerializer[T >: Null <: Any : JsonFormat] extends Serializer[T] {
override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = ()
override def serialize(topic: String, data: T): Array[Byte] = {
data.toJson.compactPrint.getBytes
}
override def close(): Unit = ()
}
class JsonDeserializer[T >: Null <: Any : JsonFormat] extends Deserializer[T] with Logging {
override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = ()
override def close(): Unit = ()
override def deserialize(topic: String, data: Array[Byte]): T = {
ByteString(data).utf8String.parseJson.convertTo[T]
}
}
class JsonSerde[T >: Null <: Any : JsonFormat] extends Serde[T] {
override def deserializer(): Deserializer[T] = new JsonDeserializer[T]
override def serializer(): Serializer[T] = new JsonSerializer[T]
override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = ()
override def close(): Unit = ()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment