Skip to content

Instantly share code, notes, and snippets.

@purefn
Created May 8, 2011 17:16
Show Gist options
  • Save purefn/961512 to your computer and use it in GitHub Desktop.
Save purefn/961512 to your computer and use it in GitHub Desktop.
cat in scala with Iteratees from scalaz-nio
package test
import scalaz.{Failure => _, _}
import Scalaz._
import effects._
import iteratees._
import java.io._
object ScalazIter {
implicit def seqEmptyChunk[B] = new EmptyChunk[Seq[B]] {
val empty = Seq()
}
def main(args: Array[String]): Unit = {
val enum = concatEnums(List("/tmp/test").map(path => enumFile(4096, new File(path))))
val io = enum(iterOutputStream(System.out)).flatMap(_.run).except(e => putStrLn(e.getMessage))
io.unsafePerformIO
}
def concatEnums[C, M[_]](es: Seq[Enumerator[C, M]])(implicit ec: EmptyChunk[C]) =
es.foldl(enumInput[C, M](Chunk(ec.empty))) { (e1, e2) => e1 andThen e2 }
def enumFile(bufferSize: Int, file: File) = new Enumerator[Seq[Byte], IO] {
def apply[A](i: Iteratee[Seq[Byte], IO, A])(implicit m: Monad[IO]) = {
openFileInputStream(file).bracket(closeIt)(enumInputStream(bufferSize)(_)(i))
}
}
def openFileInputStream(file: File): IO[FileInputStream] = IO(rw => (rw, new FileInputStream(file)))
def closeIt(c: Closeable): IO[Unit] = IO(rw => (rw, c.close))
def enumInputStream(bufferSize: Int): InputStream => Enumerator[Seq[Byte], IO] = is => new Enumerator[Seq[Byte], IO] {
def read = readInputStream(bufferSize)
def apply[A](i: Iteratee[Seq[Byte], IO, A])(implicit m: Monad[IO]): IO[Iteratee[Seq[Byte], IO, A]] =
i.fold(
cont = k => read(is).
flatMap({ r: Read => k(r.fold(eof = EOF(None), read = in => Chunk(in))).pure }).
except(e => k(EOF(Some(e.getMessage))).pure),
done = (value, extra) => Done(value, extra).pure,
error = (err, extra) => Failure(err, extra).pure
)
}
def readInputStream(bufferSize: Int): InputStream => IO[Read] = {
val buffer = new Array[Byte](bufferSize)
def read: InputStream => IO[Read] = is => IO(rw => (rw, is.read(buffer, 0, bufferSize))) map { s =>
new Read {
def fold[A](eof: => A, read: Seq[Byte] => A) =
if (s == -1) eof
else read(buffer.slice(0, s))
}
}
read
}
def iterOutputStream(os: OutputStream): Iteratee[Seq[Byte], IO, Unit] = {
def write: Seq[Byte] => IO[Unit] = bs => IO(rw => (rw, os.write(bs.toArray)))
def step: Input[Seq[Byte]] => Iteratee[Seq[Byte], IO, Unit] = {
case Chunk(xs) => FlattenI(write(xs) >>=| Cont(step).pure[IO])
case eof@EOF(Some(err)) => Failure(err, eof)
case eof@EOF(None) => Done((), eof)
}
Cont(step)
}
sealed trait Read {
def fold[A](eof: => A, read: Seq[Byte] => A): A
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment