Skip to content

Instantly share code, notes, and snippets.

@quelgar
Last active January 19, 2018 00:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save quelgar/3e8aa15d3211f4c791acceeeef6beeef to your computer and use it in GitHub Desktop.
Save quelgar/3e8aa15d3211f4c791acceeeef6beeef to your computer and use it in GitHub Desktop.
Monix Pipe for byte stream framing
import java.nio.ByteBuffer
import monix.nio.text.UTF8Codec._
import monix.nio.file
import monix.reactive._
import monix.reactive.observers.Subscriber
import monix.execution._
import monix.execution.exceptions
import monix.execution.atomic.Atomic
import monix.reactive.subjects.Subject
import monix.execution.Scheduler.Implicits.global
private def findInBuffer(search: ByteBuffer, data: ByteBuffer): Boolean = {
val initDataPos = data.position
while (data.hasRemaining) {
if (data.remaining < search.remaining) {
return false
}
val dataMatch = data.asReadOnlyBuffer
var matched = true
val myBreaks = new Breaks
myBreaks.breakable {
while (search.hasRemaining && dataMatch.hasRemaining) {
if (search.get() != dataMatch.get()) {
matched = false
myBreaks.break()
}
}
}
search.position(0)
if (matched) {
return true
}
data.get() // increment position
}
data.position(initDataPos)
return false
}
object Framing {
def apply(delimiter: Array[Byte], maximumLength: Int) = new Framing(ByteBuffer.wrap(delimiter), maximumLength)
}
def log(s: String) = if (false) println(s)
final class Framing(delimiter: ByteBuffer, maximumLength: Int) extends Pipe[Array[Byte], Array[Byte]] {
override def unicast: (Observer[Array[Byte]], Observable[Array[Byte]]) = {
val sub = new Subject[Array[Byte], Array[Byte]] {
private[this] val subscriber = Atomic(Option.empty[Subscriber[Array[Byte]]])
private[this] val stopOnNext = Atomic(false)
private var buffer: ByteBuffer = _
override def size: Int = if (subscriber.get.nonEmpty) 1 else 0
override def unsafeSubscribeFn(subscriber: Subscriber[Array[Byte]]): Cancelable = {
if (!this.subscriber.compareAndSet(None, Some(subscriber))) {
subscriber.onError(exceptions.APIContractViolationException(this.getClass.getName))
Cancelable.empty
} else {
buffer = ByteBuffer.allocate(maximumLength)
Cancelable(() => stopOnNext.set(true))
}
}
override def onError(ex: Throwable): Unit = subscriber.get.foreach(_.onError(ex))
override def onComplete(): Unit = subscriber.get.foreach(_.onComplete())
override def onNext(elem: Array[Byte]): Future[Ack] = {
log(s"** onNext, ${elem.size}")
def onNextBuf(elemBuf: ByteBuffer): Future[Ack] = {
if (stopOnNext.get || subscriber.get.isEmpty) {
Ack.Stop
} else if (elemBuf.hasRemaining) {
log(s"** Element has ${elemBuf.remaining} remaining")
if (elemBuf.remaining > buffer.remaining) {
log(s"** Partial element copy, buffer remaining=${buffer.remaining}, element pos=${elemBuf.position}")
elemBuf.limit(elemBuf.position + buffer.remaining)
buffer.put(elemBuf)
elemBuf.limit(elemBuf.capacity)
} else {
log(s"** Full element copy")
buffer.put(elemBuf)
}
log(s"** Before doFind(), buffer position=${buffer.position}, buffer limit=${buffer.limit}")
buffer.flip()
def doFind(): Future[Ack] = {
val initLimit = buffer.limit
val initPos = buffer.position
log(s"** doFind, buffer pos=${buffer.position}, limit=${initLimit}")
if (findInBuffer(delimiter, buffer)) {
log(s"** FOUND, buffer pos=${buffer.position}, limit=${buffer.limit}")
buffer.limit(buffer.position)
buffer.position(initPos)
val out = new Array[Byte](buffer.remaining)
buffer.get(out)
buffer.limit(initLimit)
buffer.position(buffer.position + delimiter.remaining)
subscriber.get.get.onNext(out).flatMap {
case Ack.Stop => Ack.Stop
case Ack.Continue => doFind()
}
} else {
log(s"** NOT FOUND, buffer pos=${buffer.position}, limit=${buffer.limit}")
buffer.compact()
onNextBuf(elemBuf)
}
}
doFind()
} else {
log("** Element processed")
Ack.Continue
}
}
onNextBuf(ByteBuffer.wrap(elem))
}
}
(sub, sub)
}
}
val lineTerm = Array('\n'.toByte)
val lines = file.readAsync(someFile, 500)
.pipeThrough(Framing(lineTerm, 200))
.pipeThrough(utf8Decode)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment