Last active
September 10, 2015 10:11
-
-
Save jkpl/034d6e7a8371157990d8 to your computer and use it in GitHub Desktop.
Async stream decoding using scodec
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package virta.dekoodaus | |
import java.nio.charset.Charset | |
import java.util.concurrent.Executors | |
import scodec.Codec | |
import scodec.bits.BitVector | |
import scodec.codecs._ | |
import scodec.stream._ | |
import scodec.stream.decode.StreamDecoder | |
import scalaz.concurrent.Task | |
import scalaz.stream.{async, Process} | |
import scalaz.stream.async.mutable.Queue | |
object VirtaDekoodaus { | |
import Payload._ | |
val data = List( | |
payloadBits("foobar"), | |
payloadBits(10, 1), | |
payloadBits("ohai there"), | |
payloadBits(0, 0), // Stops stream | |
payloadBits("not consumed") | |
) | |
def start(): Unit = { | |
val executor = Executors.newSingleThreadExecutor() | |
// Wire up queue and stream decoding process | |
val (queue, streamProcess) = queuedStream() | |
val streamTask = streamProcess.map(printValue("Got value")).run | |
// Start the decoding process in another execution thread | |
Task.fork(streamTask)(executor).runAsync { result => | |
println("Decoding finished. Result: " + result) | |
executor.shutdown() | |
} | |
// Split our input data into 10-bit segments | |
val segmentedData = data.reduce(_ ++ _).grouped(10) | |
// Enqueue all of the segments and close the queue | |
segmentedData.foreach { d => queue.enqueueOne(d).run } | |
queue.close.run | |
} | |
def payloadStream: StreamDecoder[Payload] = { | |
decode.tryOnce(payloadCodec) flatMap { v => | |
if (v.isStop) decode.halt | |
else decode.emit(v) ++ payloadStream | |
} | |
} | |
def queuedStream(bufferSize: Int = 100): (Queue[BitVector], Process[Task, Payload]) = { | |
val queue = async.boundedQueue[BitVector](bufferSize) | |
val bits = toLazyBitVector(queue.dequeue) | |
val process = payloadStream.decode(bits) | |
(queue, process) | |
} | |
def printValue[T](prefix: String)(value: T): T = { | |
println(prefix + ": " + value) | |
value | |
} | |
} | |
object Payload { | |
implicit val payload1Codec: Codec[Payload1] = | |
(("P1 first" | uint(8)) :: ("P2 second" | uint(8))).as[Payload1] | |
implicit val payload2Codec: Codec[Payload2] = | |
("P2 value" | fixedSizeBytes(10, ascii)).as[Payload2] | |
implicit val payloadCodec: Codec[Payload] = | |
either(bool, payload1Codec, payload2Codec).xmap(fromEither, toEither) | |
def payload1Bits(first: Int, second: Int) = { | |
BitVector.fromInt(first, 8) ++ BitVector.fromInt(second, 8) | |
} | |
def payload2Bits(value: String) = { | |
BitVector.encodeString(value.padTo(10, '-').take(10))(Charset.forName("ascii")) match { | |
case Right(v) => v | |
case Left(e) => throw e // Don't try this at home, kids | |
} | |
} | |
def payloadBits(value: String): BitVector = | |
BitVector.bit(true) ++ payload2Bits(value) | |
def payloadBits(first: Int, second: Int): BitVector = | |
BitVector.bit(false) ++ payload1Bits(first, second) | |
private def fromEither(e: Either[Payload1, Payload2]): Payload = e.fold(identity, identity) | |
private def toEither(p: Payload): Either[Payload1, Payload2] = p match { | |
case p1: Payload1 => Left(p1) | |
case p2: Payload2 => Right(p2) | |
} | |
} | |
sealed trait Payload { | |
def isStop: Boolean | |
} | |
case class Payload1(first: Int, second: Int) extends Payload { | |
override def isStop: Boolean = first == 0 && second == 0 | |
} | |
case class Payload2(value: String) extends Payload { | |
override def isStop: Boolean = value.toLowerCase.startsWith("stop") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment