Skip to content

Instantly share code, notes, and snippets.

@pchiusano
Last active August 29, 2015 13:57
Show Gist options
  • Save pchiusano/9769027 to your computer and use it in GitHub Desktop.
Save pchiusano/9769027 to your computer and use it in GitHub Desktop.
Break a binary input stream along frame boundaries using a pure `Process1`
import scodec.codecs
import scodec.bits.{BitVector,ByteVector}
import scalaz.stream.{Process1,Process}
object Unframing {
/**
* Break an input bytestream along frame boundaries. Input consists of a stream of frames,
* where each frame is just a number of bytes, encoded as an int32, followed by a packet of
* that many bytes. End of stream is indicated with a frame of size <= 0. Output stream is
* the stream of frame payloads. Input stream may be chunked at any granularity.
*/
def unframe: Process1[ByteVector,ByteVector] = {
// parser can be in one of two states - waiting for 4 bytes to accumulate, to read
// the frame 'header' (just the size of the frame in bytes)
def frameHeader(acc: ByteVector): Process1[ByteVector,ByteVector] =
if (acc.size < 4) Process.await1[ByteVector].flatMap(bs => frameHeader(acc ++ bs))
else codecs.int32.decode(acc.toBitVector).fold (
errMsg => Process.fail(new IllegalArgumentException(errMsg)),
{ case (rem,size) => if (size <= 0) Process.halt else readFrame(size,rem) }
)
// or it can be in the state of accumulating a number of bytes specified by the
// frame header
def readFrame(bytesToRead: Int, bits: BitVector): Process1[ByteVector,ByteVector] =
if (bits.size / 8 >= bytesToRead) {
val bytes = bits.toByteVector
val frame = bytes.take(bytesToRead)
Process.emit(frame) fby frameHeader(bytes.drop(bytesToRead))
}
else
Process.await1[ByteVector].flatMap {
bs => readFrame(bytesToRead, bits ++ BitVector(bs))
}
frameHeader(ByteVector.empty)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment