Skip to content

Instantly share code, notes, and snippets.

@sadache
Last active August 29, 2015 13:56
Show Gist options
  • Save sadache/8804038 to your computer and use it in GitHub Desktop.
Save sadache/8804038 to your computer and use it in GitHub Desktop.
A naive, potentially buggy and largely improvable implementation of Enumerator[Array[Byte]] to InputStream
import java.io.InputStream
import play.api.libs.iteratee.{ Enumerator, Iteratee }
import scala.concurrent.Promise
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{ Success, Failure }
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.atomic.AtomicInteger
def toInputStream(chunks: Enumerator[Array[Byte]], bufferSize: Int): InputStream = new InputStream {
@volatile var waiting: Option[Promise[(Unit, Boolean)]] = None
// Improvement: replace BlockingQueue with A ring array buffer and block read manually
private val queue = new ArrayBlockingQueue[Option[Array[Byte]]](bufferSize)
var closed = false
var remaining: Option[Array[Byte]] = None
val availableBytes: AtomicInteger = new AtomicInteger(0)
chunks |>>> Iteratee.fold2[Array[Byte], Unit](()) { case (_, c) =>
if(queue.size + 1 < bufferSize){
queue.add(Some(c))
availableBytes.addAndGet(c.size)
Future.successful(((), closed))
} else {
val w = Promise[(Unit, Boolean)]()
waiting = Some(w)
if(queue.peek() == null || closed) w.tryComplete(Success(((), closed)))
w.future
}
} onComplete {
case Success(()) => queue.add(None)
case Failure(e) => println("Exception while feeding Enumerator to inputstream " + e)
}
private def tryFreeEnumerator(): Unit = {
waiting.foreach{ p =>
waiting = None
p.tryComplete(Success(((), closed)))
}
}
override def close(): Unit = {
closed = true
tryFreeEnumerator()
}
override def read(b: Array[Byte]): Int = read(b, 0, b.length)
override def available() = {
availableBytes.get()
}
def read():Int = {
val a = new Array[Byte](1)
read(a,0,1)
a(0)
}
var done = false
override def read(b: Array[Byte], off: Int, len: Int): Int = {
if(done && queue.size <= 0 && !remaining.isDefined){
println("done")
-1
}
else{
var left = Math.max(len - remaining.map(_.size).filter(_ > 0).getOrElse(0), 0)
remaining.foreach { arr =>
arr.copyToArray(b, off, len)
remaining = Option(arr.slice(len, arr.size)).filter(_.size > 0)
}
tryFreeEnumerator()
while((left > 0 && (!done)) && (! (queue.peek() == null && left < len))) {
val chunk1 = queue.take()
chunk1 match {
case Some(c) =>
c.copyToArray(b, off + (len-left), len)
left -= c.size
if(left < 0) remaining = Some(c.slice(len, c.size))
case None => done = true
}
}
val consumed = Math.min(len - left, len)
availableBytes.addAndGet(- consumed)
consumed
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment