Skip to content

Instantly share code, notes, and snippets.

@imduffy15
Created August 17, 2017 13:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save imduffy15/b48ebc18ddc1cf61237be481b0896967 to your computer and use it in GitHub Desktop.
Save imduffy15/b48ebc18ddc1cf61237be481b0896967 to your computer and use it in GitHub Desktop.
import java.io.{ ByteArrayInputStream, DataInputStream }
import java.util
import scala.collection.mutable.ListBuffer
import scala.util.{ Failure, Success, Try }
import org.apache.kafka.common.serialization.Deserializer
class ListDeserializer[T](valueDeserializer: Deserializer[T]) extends Deserializer[List[T]] {
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = {
}
override def close(): Unit = {
}
override def deserialize(topic: String, data: Array[Byte]): List[T] = {
val dataInputStream = new DataInputStream(new ByteArrayInputStream(data))
Try({
val buffer = ListBuffer[T]()
val records = dataInputStream.readInt()
for (i <- 0 to records) {
val valueBytes = new Array[Byte](dataInputStream.readInt())
dataInputStream.read(valueBytes)
val value: T = valueDeserializer.deserialize(topic, valueBytes)
buffer += value
}
buffer.toList
}) match {
case Success(list: List[T]) =>
list
case Failure(ex) =>
throw ex
}
}
}
import java.util
import org.apache.kafka.common.serialization.{ Deserializer, Serde, Serdes, Serializer }
class ListSerde[T](
innerValueSerde: Serde[T]) extends Serde[List[T]] {
private val listSerde = Serdes.serdeFrom(
new ListSerializer(innerValueSerde.serializer()),
new ListDeserializer(innerValueSerde.deserializer()))
override def deserializer(): Deserializer[List[T]] = listSerde.deserializer()
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = {
listSerde.serializer().configure(configs, isKey)
listSerde.deserializer().configure(configs, isKey)
}
override def close(): Unit = {
listSerde.serializer().close()
listSerde.deserializer().close()
}
override def serializer(): Serializer[List[T]] = listSerde.serializer()
}
import java.io.{ ByteArrayOutputStream, DataOutputStream }
import java.util
import org.apache.kafka.common.serialization.Serializer
class ListSerializer[T](valueSerializer: Serializer[T]) extends Serializer[List[T]] {
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = {
}
override def serialize(topic: String, data: List[T]): Array[Byte] = {
val byteArrayOutputStream = new ByteArrayOutputStream()
val output = new DataOutputStream(byteArrayOutputStream)
data.foreach((value) => {
val bytes = valueSerializer.serialize(topic, value)
output.writeInt(bytes.length)
output.write(bytes)
})
byteArrayOutputStream.toByteArray
}
override def close(): Unit = {
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment