Skip to content

Instantly share code, notes, and snippets.

@fiadliel
Last active August 29, 2015 14:26
Show Gist options
  • Save fiadliel/45787aac12aaca86a0b4 to your computer and use it in GitHub Desktop.
Save fiadliel/45787aac12aaca86a0b4 to your computer and use it in GitHub Desktop.
OutputStream that writes to a scalaz-stream mutable Queue
import java.io.OutputStream
import scalaz.stream._
import scodec.bits.ByteVector
class QueueOutputStream(q: async.mutable.Queue[ByteVector]) extends OutputStream {
override def write(b: Int): Unit =
q.enqueueOne(ByteVector(b & 0xff)).run
override def write(b: Array[Byte], off: Int, len: Int): Unit = {
val bv = ByteVector.view(b).drop(off).take(len).copy
q.enqueueOne(bv).run
}
override def close(): Unit =
q.close.run
// feels dangerous. Perhaps it should be a no-op instead.
override def flush(): Unit =
q.size.discrete.find(_ == 0).run.run
}
@fiadliel
Copy link
Author

fiadliel commented Aug 7, 2015

Use with q.dequeue, q.dequeueBatch, or q.dequeueAvailable as a source to a scalaz-stream pipeline.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment