Created
August 17, 2017 13:50
-
-
Save imduffy15/b48ebc18ddc1cf61237be481b0896967 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.{ ByteArrayInputStream, DataInputStream } | |
import java.util | |
import scala.collection.mutable.ListBuffer | |
import scala.util.{ Failure, Success, Try } | |
import org.apache.kafka.common.serialization.Deserializer | |
class ListDeserializer[T](valueDeserializer: Deserializer[T]) extends Deserializer[List[T]] { | |
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = { | |
} | |
override def close(): Unit = { | |
} | |
override def deserialize(topic: String, data: Array[Byte]): List[T] = { | |
val dataInputStream = new DataInputStream(new ByteArrayInputStream(data)) | |
Try({ | |
val buffer = ListBuffer[T]() | |
val records = dataInputStream.readInt() | |
for (i <- 0 to records) { | |
val valueBytes = new Array[Byte](dataInputStream.readInt()) | |
dataInputStream.read(valueBytes) | |
val value: T = valueDeserializer.deserialize(topic, valueBytes) | |
buffer += value | |
} | |
buffer.toList | |
}) match { | |
case Success(list: List[T]) => | |
list | |
case Failure(ex) => | |
throw ex | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util | |
import org.apache.kafka.common.serialization.{ Deserializer, Serde, Serdes, Serializer } | |
class ListSerde[T]( | |
innerValueSerde: Serde[T]) extends Serde[List[T]] { | |
private val listSerde = Serdes.serdeFrom( | |
new ListSerializer(innerValueSerde.serializer()), | |
new ListDeserializer(innerValueSerde.deserializer())) | |
override def deserializer(): Deserializer[List[T]] = listSerde.deserializer() | |
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = { | |
listSerde.serializer().configure(configs, isKey) | |
listSerde.deserializer().configure(configs, isKey) | |
} | |
override def close(): Unit = { | |
listSerde.serializer().close() | |
listSerde.deserializer().close() | |
} | |
override def serializer(): Serializer[List[T]] = listSerde.serializer() | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.{ ByteArrayOutputStream, DataOutputStream } | |
import java.util | |
import org.apache.kafka.common.serialization.Serializer | |
class ListSerializer[T](valueSerializer: Serializer[T]) extends Serializer[List[T]] { | |
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = { | |
} | |
override def serialize(topic: String, data: List[T]): Array[Byte] = { | |
val byteArrayOutputStream = new ByteArrayOutputStream() | |
val output = new DataOutputStream(byteArrayOutputStream) | |
data.foreach((value) => { | |
val bytes = valueSerializer.serialize(topic, value) | |
output.writeInt(bytes.length) | |
output.write(bytes) | |
}) | |
byteArrayOutputStream.toByteArray | |
} | |
override def close(): Unit = { | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment