Skip to content

Instantly share code, notes, and snippets.

@Pyppe
Created September 7, 2015 10:46
Show Gist options
  • Save Pyppe/f4739730231143ca5ed3 to your computer and use it in GitHub Desktop.
Save Pyppe/f4739730231143ca5ed3 to your computer and use it in GitHub Desktop.
"pool-1-thread-4" #31 daemon prio=5 os_prio=0 tid=0x00007f3cb8014800 nid=0x377b waiting on condition [0x00007f3cff8f8000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000ec1cc150> (a java.util.concurrent.FutureTask)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429)
at java.util.concurrent.FutureTask.get(FutureTask.java:191)
at scalaz.concurrent.StrategysLow$$anon$3$$anonfun$apply$2.apply(Strategy.scala:81)
at scala.Function0$class.apply$mcZ$sp(Function0.scala:34)
class QueueOutputStream(id: String, q: async.mutable.Queue[ByteVector]) extends OutputStream {
override def write(b: Int): Unit = q.enqueueOne(ByteVector(b)).run
override def write(b: Array[Byte], off: Int, len: Int): Unit = {
val bv = ByteVector.view(b).drop(off).take(len).copy
q.enqueueOne(bv).run
}
override def close(): Unit = q.close.run
// hmm?
override def flush(): Unit = q.size.discrete.find(_ == 0).run.run
}
// Usage
def processOutputStream(f: OutputStream => Unit): Process[Task, ByteVector] = {
val queue = async.boundedQueue[ByteVector](8)
val qos = new QueueOutputStream(id, queue)
Strategy.DefaultStrategy(f(qos))
queue.dequeue
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment