Skip to content

Instantly share code, notes, and snippets.

@florianverhein
Last active September 7, 2015 00:02
Show Gist options
  • Save florianverhein/2ed965bde7324cb73325 to your computer and use it in GitHub Desktop.
Save florianverhein/2ed965bde7324cb73325 to your computer and use it in GitHub Desktop.
Drive scalaz.stream.Process externally and expose as Iterator
/**
* Turn a Process[Task,O] into an Iterator[O].
*
* Uses the toTask trick discussed here: https://groups.google.com/forum/#!topic/scalaz/gx0eXHpQN48
* Note: "It's a hack because it's not resource safe - if you stop examining the `Task` before
* it completes, finalizers for the stream are not guaranteed to be run".
* Hence, the iterator should always be completely consumed.
*
* An earlier attempt at tackling this problem is kept below.
*/
class TaskProcessIterator[O](p : Process[Task,O]) extends Iterator[O] {
val task : Task[O] = p.toTask
val queue = scala.collection.mutable.Queue[O]()
var hasNextCalled = false //Defensive :/ let's ensure correct usage
private def step() = {
task.attemptRun match {
case \/-(o) =>
queue += o
case -\/(Terminated(End)) =>
// Normal termination
case -\/(Terminated(Error(rsn))) =>
throw rsn;
}
}
override def hasNext: Boolean = {
if (queue.isEmpty) step()
hasNextCalled = true
!queue.isEmpty
}
override def next() = {
assert(hasNextCalled,"Must call hasNext() at least once before each next() for this to work.")
hasNextCalled = false
queue.dequeue()
}
}
/**
* A first (flawed) attempt at driving a scalaz-stream Process externally and exposing its
* stream of values as an iterator.
* Based on Process.runFoldMap
*/
class ProcessIterator[F[x], O](p : Process[F,O]) extends Iterator[O] {
val queue = scala.collection.mutable.Queue[O]()
var current = p
var hasNextCalled = false //Defensive :/ let's ensure correct usage
//FIXME NOTE: F is unused! This does not support contexts and will fail in these cases!
//TODO How can we fix this?
private def nextEmitted(p : Process[F,O]) = {
def go(cur: Process[F, O]): (Seq[O],Process[F,O]) = {
cur.step match {
case s: Step[F,O]@unchecked =>
(s.head, s.next) match {
case (Emit(os), cont) =>
(os, cont.continue.asInstanceOf[Process[F,O]])
case (awt:Await[F,Any,O]@unchecked, cont) =>
go(cont.continue.asInstanceOf[Process[F,O]])
}
case Halt(x) => (Seq.empty, Halt(x).asInstanceOf[Process[F,O]])
}
}
go(p)
}
override def hasNext: Boolean = {
if (queue.isEmpty) {
current match {
case Halt(Error(rsn)) =>
throw rsn
case default =>
val (os,next) = nextEmitted(current)
queue ++= os
current = next
}
}
hasNextCalled = true
!queue.isEmpty
}
override def next() = {
assert(hasNextCalled,"Must call hasNext() at least once before each next() for this to work.")
hasNextCalled = false
queue.dequeue()
}
}
/*
Works:
val p = Process.range(1,10)
val it = new ProcessIterator(p)
it.toArray
To be expected, fails on:
val p = scalaz.stream.io.linesR("build.sbt")
Note to self: How implement linesR without Task?
i.e. rather than using Task.run to drive the state machine,
should be able to drive a process that reads from a file externally too...
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment