Skip to content

Instantly share code, notes, and snippets.

@ajaychandran
Last active August 29, 2015 14:22
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ajaychandran/a179c519b4c45753a334 to your computer and use it in GitHub Desktop.
Save ajaychandran/a179c519b4c45753a334 to your computer and use it in GitHub Desktop.
scodec: FilterCodec
package scodec.codecs
import scodec._
import scodec.bits.{ByteVector, BitVector}
/**
* ChecksumCodec
*/
object ChecksumCodec {
/**
* Returns a codec that encodes a bit-range to a bit-checksum and decodes bits to a bit-range.
*
* @param encoder encodes a bit-range to a bit-checksum
* @param range decodes the size of a bit-range
* @return
*/
def apply(encoder: Encoder[BitVector], range: Decoder[Long]): Codec[BitVector] =
Codec(encoder, Decoder(
bits => range.decode(bits).flatMap(
size => bits.consumeThen(size.value)(
e => Attempt.failure(Err.InsufficientBits(size.value, bits.size, List(e))),
(range, remainder) => Attempt.successful(DecodeResult(range, remainder))))))
/**
* Returns a codec that encodes a bit-range to a bit-checksum and decodes bits to a bit-range.
*
* @param encoder encodes a bit-range to a bit-checksum
* @param range decodes the (un-padded) size of a bit-range
* @param padding size padding for the bit-range
* @return
*/
def apply(encoder: Encoder[BitVector], range: Decoder[Long], padding: Long): Codec[BitVector] =
apply(encoder, range map (_ + padding))
/**
* Returns a codec that encodes a bit-range to a bit-checksum and decodes bits to a bit-range.
*
* @param encoder encodes a byte-range to a byte-checksum
* @param range decodes the (un-padded) size of a byte-range
* @param padding size padding for the byte-range
* @return
*/
def apply(encoder: Encoder[ByteVector], range: Decoder[Int], padding: Int): Codec[BitVector] =
apply(encoder.contramap[BitVector](_.bytes), range.map(8L * _), 8l * padding)
/**
* Returns a codec that encodes a bit-range to a bit-checksum and decodes bits to a bit-range.
*
* @param length the bit-length of the checksum
* @param f computes bit-checksum
* @param range decodes the (un-padded) size of a bit-range
* @param padding size padding for the bit-range
* @return
*/
def apply(length: Long, f: BitVector => BitVector, range: Decoder[Long], padding: Long): Codec[BitVector] =
apply(new Encoder[BitVector] {
def encode(value: BitVector): Attempt[BitVector] = Attempt.successful(f(value))
def sizeBound: SizeBound = SizeBound.exact(length)
}, range, padding)
/**
* Returns a codec that encodes a bit-range to a bit-checksum and decodes bits to a bit-range.
*
* @param length the byte-length of the checksum
* @param f computes byte-checksum
* @param range decodes the (un-padded) size of a byte-range
* @param padding size padding for the byte-range
* @return
*/
def apply(length: Int, f: ByteVector => ByteVector, range: Decoder[Int], padding: Int): Codec[BitVector] =
apply(8L * length, (bits: BitVector) => f(bits.bytes).bits, range.map(8L + _), 8L * padding)
/**
* Returns a codec that encodes a bit-range to an XORed bit-checksum and decodes bits to a bit-range.
*
* @param length the bit-length of the checksum
* @param range decodes the (un-padded) size of a bit-range
* @param padding size padding for the bit-range
* @return
*/
def xor(length: Long, range: Decoder[Long], padding: Long): Codec[BitVector] =
apply(length, (bits: BitVector) => bits.grouped(length).foldLeft(BitVector.low(length))(_ xor _), range, padding)
/**
* Returns a codec that encodes a bit-range to an XORed bit-checksum and decodes bits to a bit-range.
*
* @param length the byte-length of the checksum
* @param range decodes the (un-padded) size of a byte-range
* @param padding size padding for the byte-range
* @return
*/
def xor(length: Int, range: Decoder[Int], padding: Int): Codec[BitVector] =
xor(8L * length, range.map(8L * _), 8L * padding)
case class Mismatch(bits: BitVector, expected: BitVector, actual: BitVector, context: List[String] = Nil) extends Err {
def message: String = s"checksum mismatch for bits: $bits, expected: $expected, actual: $actual"
def pushContext(ctx: String): Err = copy(context = ctx :: context)
}
}
package scodec.codecs
import scodec._
import scodec.bits._
/**
* ChecksumCodecTest
*/
class ChecksumCodecTest extends CodecSuite {
"checksummed codec" should {
val codec = checksummed(ChecksumCodec.xor(1, int32, 4), variableSizeBytes(int32, utf8))
"roundtrip" in {
forAll { (s: String) => roundtrip(codec, s) }
}
"roundtrip using combinators" in {
forAll { (n: Int, s: String) => roundtrip(int32 ~ codec, n ~ s) }
}
"append checksum on encode" in {
codec.encode("hello world").require should equal(hex"0x0000000b68656c6c6f20776f726c642b".bits)
}
"verify (and remove) checksum on decode" in {
codec.decode(hex"0x0000000b68656c6c6f20776f726c642b".bits).require.value should equal("hello world")
codec.decode(hex"0x0000000b68656c6c6f20776f726c642b".bits).require.remainder should equal(BitVector.empty)
}
"fail decoding on checksum mismatch" in {
codec.decode(hex"0x0000000b68656c6c6f20776f726c6400".bits) should equal(Attempt.failure(ChecksumCodec.Mismatch(hex"0x0000000b68656c6c6f20776f726c64".bits, hex"2b".bits, hex"00".bits)))
}
}
}
/**
* Codec that filters bits before/after decoding/encoding.
*
* @param filter a codec that represents pre/post-processing stages for input/output bits
* @param codec the target codec
* @tparam A the result type
* @return
*/
def filtered[A](filter: Codec[BitVector], codec: Codec[A]): Codec[A] = new Codec[A] {
def encode(value: A): Attempt[BitVector] = codec.encode(value) flatMap filter.encode
def sizeBound: SizeBound = filter.sizeBound
def decode(bits: BitVector): Attempt[DecodeResult[A]] =
filter.decode(bits)
.fold(e => Attempt.failure(e), r => codec.decode(r.value)
.fold(e => Attempt.failure(e), a => Attempt.successful(a.mapRemainder(_ ++ r.remainder))))
}
/**
* Codec that filters a checksum.
*
* @param checksum a codec that encodes a bit-range to a bit-checksum and decodes bits to a bit-range
* @param codec the target codec
* @tparam A the result type
* @return
* @see [[ChecksumCodec]]
*/
def checksummed[A](checksum: Codec[BitVector], codec: Codec[A]): Codec[A] = filtered(new Codec[BitVector] {
def encode(value: BitVector): Attempt[BitVector] = checksum.encode(value).map(value ++ _)
def sizeBound: SizeBound = checksum.sizeBound
def decode(bits: BitVector): Attempt[DecodeResult[BitVector]] =
checksum.decode(bits).flatMap(
r => checksum.encode(r.value).flatMap(
expected => r.remainder.consumeThen(expected.size)(
e => Attempt.failure(Err.InsufficientBits(expected.size, r.remainder.size, List(e))),
(actual, remainder) =>
if (expected == actual) Attempt.successful(DecodeResult(r.value, remainder))
else Attempt.failure(ChecksumCodec.Mismatch(r.value, expected, actual)))))
}, codec)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment