Skip to content

Instantly share code, notes, and snippets.

@scf37
Created March 11, 2021 09:08
Show Gist options
  • Save scf37/856dfcfb429b681e370c288f575b6eb7 to your computer and use it in GitHub Desktop.
Save scf37/856dfcfb429b681e370c288f575b6eb7 to your computer and use it in GitHub Desktop.
import java.io._
/**
* Schema for efficient binary serialization of case classes
* Implementation must:
* - declare all fields using `required` and `optional` methods
* - implement doRead, transferring field values from declaration to case class instance
* - assign every field an unique tag
*
* Protocol.
* Required fields are sorted by tag and serialized first, without any additional metadata
* Optional fields are prefixed by tag number, None is serialized by writing nothing at all.
*
* Compatibility.
* Backward-compatible changes are:
* - adding optional fields
* - re-arranging field declarations w/o changing tags
* Backward-incompatible changes are:
* - adding or removing required fields
* - removing optional fields
* - changing item tags
*
* @tparam A
*/
trait BinarySchema[A] extends BinaryCodec[A] {
sealed trait Field[B] {
/**
* Extract field value for ReadContext during deserialization
* @param ctx context
* @return field value
*/
def apply()(implicit ctx: ReadContext): B
}
trait ReadContext
/**
* Declare required field. Required fields consume one byte less during serialization but have restricted compatibility
*
* @param get value extractor
* @param tag tag
* @tparam B
* @return field
*/
def required[B: BinaryCodec](get: A => B, tag: Int): Field[B] = {
val f = FieldImpl[B](tag, true, (a, os) => BinaryCodec[B].write(get(a), os), BinaryCodec[B].read)
fields = fields :+ f
f
}
/**
* Declare optional field. Optional fields consume zero bytes when serializing None but 1 more byte otherwise
*
* @param get value extractor
* @param tag tag
* @tparam B
* @return field
*/
def optional[B: BinaryCodec](get: A => Option[B], tag: Int): Field[Option[B]] = {
val f = FieldImpl[Option[B]](tag, false, (a, os) => {
get(a).foreach { b =>
BinaryCodec[Int].write(tag, os)
BinaryCodec[B].write(b, os)
}
}, is => Some(BinaryCodec[B].read(is)))
fields = fields :+ f
f
}
/**
* Create case class instance from read context
* @param ctx context
* @return assembled case class
*/
protected def doRead(implicit ctx: ReadContext): A
private var fields = Vector.empty[FieldImpl[_]]
@volatile
private var requiredSchema: List[FieldImpl[_]] = null
@volatile
private var optionalSchema: Map[Int, FieldImpl[_]] = null
@volatile
private var defaults: Map[Int, Any] = null
// process collected fields to speed up further r/w operations
private def initSchema(): Unit = {
if (requiredSchema == null) {
requiredSchema = fields.filter(_.required).sortBy(_.tag).toList
optionalSchema = fields.filterNot(_.required).map(f => f.tag -> f).toMap
defaults = optionalSchema.values.map(_.tag -> None).toMap
if (fields.size != requiredSchema.size + optionalSchema.size)
throw new IllegalStateException("Duplicate tags detected")
}
}
private case class FieldImpl[B](
tag: Int,
required: Boolean,
write: (A, DataOutputStream) => Unit,
read: DataInputStream => B
) extends Field[B] {
override def apply()(implicit ctx: ReadContext): B = ctx.asInstanceOf[ReadContextImpl].data(tag).asInstanceOf[B]
}
private case class ReadContextImpl(data: Map[Int, Any]) extends ReadContext
override def read(is: DataInputStream): A = {
initSchema()
var data = defaults
requiredSchema.foreach { f =>
data += f.tag -> f.read(is)
}
while (is.available() > 0) {
val tag = BinaryCodec[Int].read(is)
data += tag -> optionalSchema.
getOrElse(tag, throw new IllegalArgumentException(s"Unknown tag in input stream: $tag"))
.read(is)
}
doRead(ReadContextImpl(data))
}
override def write(a: A, os: DataOutputStream): Unit = {
initSchema()
requiredSchema.foreach { f =>
f.write(a, os)
}
optionalSchema.values.foreach { f =>
f.write(a, os)
}
}
}
/**
* Binary codec, capable of encoding and decoding A into/from bytes
* @tparam A
*/
trait BinaryCodec[A] {
def read(is: DataInputStream): A
def write(a: A, os: DataOutputStream): Unit
}
object BinaryCodec {
def apply[A: BinaryCodec]: BinaryCodec[A] = implicitly
private def writeVarint(value: Long, os: OutputStream): Unit = {
var v = value
while (true) {
val block = v & 0x7F
v = v >>> 7
if (v != 0) {
os.write((block | 0x80).toInt)
} else {
os.write(block.toInt)
return
}
}
}
private def readVarint(is: InputStream, maxBits: Int): Long = {
var v = 0L
var i = 0
while (true) {
val b = is.read()
if (b == -1) throw new IllegalArgumentException("Unexpected eof when reading varint")
val block: Long = b & 0x7F
v = v | (block << i)
if (i >= maxBits) throw new IllegalArgumentException("Variable length quantity is too long")
i += 7
if ((b & 0x80) == 0) {
return v
}
}
???
}
implicit val byteCodec: BinaryCodec[Byte] = new BinaryCodec[Byte] {
override def read(is: DataInputStream): Byte = is.readByte()
override def write(a: Byte, os: DataOutputStream): Unit = os.writeByte(a)
}
implicit val intCodec: BinaryCodec[Int] = new BinaryCodec[Int] {
override def read(is: DataInputStream): Int = readVarint(is, 32).toInt
override def write(a: Int, os: DataOutputStream): Unit = writeVarint(a, os)
}
implicit val longCodec: BinaryCodec[Long] = new BinaryCodec[Long] {
override def read(is: DataInputStream): Long = readVarint(is, 64)
override def write(a: Long, os: DataOutputStream): Unit = writeVarint(a, os)
}
implicit val stringCodec: BinaryCodec[String] = new BinaryCodec[String] {
override def read(is: DataInputStream): String = {
val len = readVarint(is, 16).toInt
val buf = new Array[Char](len)
var i = 0
while (i < len) {
buf(i) = readVarint(is, 16).toChar
i += 1
}
new String(buf)
}
override def write(a: String, os: DataOutputStream): Unit = {
val len = a.length
writeVarint(len, os)
var i = 0
while (i < len) {
writeVarint(a.charAt(i), os)
i += 1
}
}
}
implicit def seqCodec[A: BinaryCodec]: BinaryCodec[Seq[A]] = new BinaryCodec[Seq[A]] {
override def read(is: DataInputStream): Seq[A] = {
val len = readVarint(is, 24)
val b = Vector.newBuilder[A]
(1L to len).foreach { _ =>
b += BinaryCodec[A].read(is)
}
b.result()
}
override def write(a: Seq[A], os: DataOutputStream): Unit = {
writeVarint(a.length, os)
a.foreach(e => BinaryCodec[A].write(e, os))
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment