Skip to content

Instantly share code, notes, and snippets.

@jkpl
Last active September 10, 2015 10:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jkpl/034d6e7a8371157990d8 to your computer and use it in GitHub Desktop.
Save jkpl/034d6e7a8371157990d8 to your computer and use it in GitHub Desktop.
Async stream decoding using scodec
package virta.dekoodaus
import java.nio.charset.Charset
import java.util.concurrent.Executors
import scodec.Codec
import scodec.bits.BitVector
import scodec.codecs._
import scodec.stream._
import scodec.stream.decode.StreamDecoder
import scalaz.concurrent.Task
import scalaz.stream.{async, Process}
import scalaz.stream.async.mutable.Queue
object VirtaDekoodaus {
import Payload._
val data = List(
payloadBits("foobar"),
payloadBits(10, 1),
payloadBits("ohai there"),
payloadBits(0, 0), // Stops stream
payloadBits("not consumed")
)
def start(): Unit = {
val executor = Executors.newSingleThreadExecutor()
// Wire up queue and stream decoding process
val (queue, streamProcess) = queuedStream()
val streamTask = streamProcess.map(printValue("Got value")).run
// Start the decoding process in another execution thread
Task.fork(streamTask)(executor).runAsync { result =>
println("Decoding finished. Result: " + result)
executor.shutdown()
}
// Split our input data into 10-bit segments
val segmentedData = data.reduce(_ ++ _).grouped(10)
// Enqueue all of the segments and close the queue
segmentedData.foreach { d => queue.enqueueOne(d).run }
queue.close.run
}
def payloadStream: StreamDecoder[Payload] = {
decode.tryOnce(payloadCodec) flatMap { v =>
if (v.isStop) decode.halt
else decode.emit(v) ++ payloadStream
}
}
def queuedStream(bufferSize: Int = 100): (Queue[BitVector], Process[Task, Payload]) = {
val queue = async.boundedQueue[BitVector](bufferSize)
val bits = toLazyBitVector(queue.dequeue)
val process = payloadStream.decode(bits)
(queue, process)
}
def printValue[T](prefix: String)(value: T): T = {
println(prefix + ": " + value)
value
}
}
object Payload {
implicit val payload1Codec: Codec[Payload1] =
(("P1 first" | uint(8)) :: ("P2 second" | uint(8))).as[Payload1]
implicit val payload2Codec: Codec[Payload2] =
("P2 value" | fixedSizeBytes(10, ascii)).as[Payload2]
implicit val payloadCodec: Codec[Payload] =
either(bool, payload1Codec, payload2Codec).xmap(fromEither, toEither)
def payload1Bits(first: Int, second: Int) = {
BitVector.fromInt(first, 8) ++ BitVector.fromInt(second, 8)
}
def payload2Bits(value: String) = {
BitVector.encodeString(value.padTo(10, '-').take(10))(Charset.forName("ascii")) match {
case Right(v) => v
case Left(e) => throw e // Don't try this at home, kids
}
}
def payloadBits(value: String): BitVector =
BitVector.bit(true) ++ payload2Bits(value)
def payloadBits(first: Int, second: Int): BitVector =
BitVector.bit(false) ++ payload1Bits(first, second)
private def fromEither(e: Either[Payload1, Payload2]): Payload = e.fold(identity, identity)
private def toEither(p: Payload): Either[Payload1, Payload2] = p match {
case p1: Payload1 => Left(p1)
case p2: Payload2 => Right(p2)
}
}
sealed trait Payload {
def isStop: Boolean
}
case class Payload1(first: Int, second: Int) extends Payload {
override def isStop: Boolean = first == 0 && second == 0
}
case class Payload2(value: String) extends Payload {
override def isStop: Boolean = value.toLowerCase.startsWith("stop")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment