Skip to content

Instantly share code, notes, and snippets.

@drstevens
Created December 16, 2013 21:32
Show Gist options
  • Save drstevens/7994805 to your computer and use it in GitHub Desktop.
Save drstevens/7994805 to your computer and use it in GitHub Desktop.
Experiments with scalaz-stream. Attempt to maintain state by calls to next.
import scala.collection.immutable
import scalaz.std.string._
import scalaz.std.list._
import scalaz.std.anyVal._
import scalaz.std.option._
import scalaz.syntax.show._
import scalaz.concurrent.Task
import scalaz.stream.{process1, io, Process}
import scalaz.stream.Process._
object Foo extends App {
def resourceS[R, I, O](
acquire: Task[(I, R)])(
release: R => Task[Unit])(
step: R => I => Task[(Option[I], O)]): Process[Task, O] = {
def go(i1: I, step: I => Task[(Option[I], O)], onExit: Process[Task, O]): Process[Task, O] =
await[Task, (Option[I], O), O](step(i1))({
case (Some(i2), o) => emit(o) ++ go(i2, step, onExit)
case (None, o) => emit(o) ++ onExit
}, onExit, onExit)
await(acquire)({
case (i, r) =>
val onExit = suspend(eval(release(r)).drain)
go(i, step(r), onExit)
}, halt, halt)
}
// ********* Simulated a managed resource ********
class NotSoNiceAPI(init: Long, step: Long, maxInclusive: Long) {
def next: (Option[Long]) => (Option[Long], immutable.Seq[Long]) = i => calcNext(i.getOrElse(0))
def cleanup(): Unit = "disposed!".println
private def calcNext(start: Long) =
Some(start + step).filter(_ <= maxInclusive) -> (start until math.min(start + step, maxInclusive + 1))
}
val test = resourceS(
Task.delay((none[Long], new NotSoNiceAPI(0, 5, 100000000L))))(
r => Task.now(r.cleanup()))(
r => i => Task.now {
val (i2, o) = r.next(i)
i2.map(Some.apply) -> o
})
val task = test.map(_.toList.shows)
.intersperse("\n")
.pipe(process1.utf8Encode)
.to(io.fileChunkW("/tmp/test.out")).run
task.run
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment