Last active
September 16, 2019 15:59
-
-
Save jeroenr/2895de32accd440c2558261a49952cab to your computer and use it in GitHub Desktop.
Json serializer / deserializer implementation for Kafka Streams based on Spray Json
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
sealed trait ApiModel | |
case class ClickData( | |
cid: String, // GA client id | |
timestamp: Option[Long] | |
) extends ApiModel | |
object ApiModel extends DefaultJsonProtocol { | |
implicit val ClickDataFormat = jsonFormat2(ClickData) | |
} | |
// Using kafka streams API | |
builder.stream("clicks", Consumed.`with`(new StringSerde, new JsonSerde[ClickData])) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.util.ByteString | |
import org.apache.kafka.common.serialization.{Deserializer, Serde, Serializer} | |
import spray.json._ | |
/** | |
* Created by jero on 24/11/17. | |
*/ | |
class JsonSerializer[T >: Null <: Any : JsonFormat] extends Serializer[T] { | |
override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = () | |
override def serialize(topic: String, data: T): Array[Byte] = { | |
data.toJson.compactPrint.getBytes | |
} | |
override def close(): Unit = () | |
} | |
class JsonDeserializer[T >: Null <: Any : JsonFormat] extends Deserializer[T] with Logging { | |
override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = () | |
override def close(): Unit = () | |
override def deserialize(topic: String, data: Array[Byte]): T = { | |
ByteString(data).utf8String.parseJson.convertTo[T] | |
} | |
} | |
class JsonSerde[T >: Null <: Any : JsonFormat] extends Serde[T] { | |
override def deserializer(): Deserializer[T] = new JsonDeserializer[T] | |
override def serializer(): Serializer[T] = new JsonSerializer[T] | |
override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = () | |
override def close(): Unit = () | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment