Skip to content

Instantly share code, notes, and snippets.

@quelgar
Last active October 27, 2019 14:44
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save quelgar/f34dde2d33e3ea58a5ef4f0b2ab193a8 to your computer and use it in GitHub Desktop.
Save quelgar/f34dde2d33e3ea58a5ef4f0b2ab193a8 to your computer and use it in GitHub Desktop.
Inputstream to ZIO Stream of byte chunks.
import java.io.InputStream
import java.nio.file.{Files, Path, Paths}
import scalaz.zio.console.putStrLn
import scalaz.zio.duration._
import scalaz.zio.stream.Stream
import scalaz.zio.stream.Stream.Fold
import scalaz.zio.{App, Chunk, IO, Managed}
object ZioInputStreamTest extends App {
/**
* Constructs a stream the repeatedly reads from a resource that must be acquired and released.
*
* Reading stops when the reading function returns None.
*/
def bracketLoop[E, R, A](acquire: IO[E, R])(release: R => IO[Nothing, Unit])(read: R => IO[E, Option[A]]): Stream[E, A] = {
managedLoop(Managed.make(acquire)(release))(read)
}
/**
* Constructs a stream that repeatedly reads from a managed resource.
*
* Reading stops when the reading function returns None.
*/
def managedLoop[E, R, A](m: Managed[E, R])(read: R => IO[E, Option[A]]): Stream[E, A] = {
new Stream[E, A] {
override def fold[E1 >: E, A1 >: A, S]: Fold[E1, A1, S] = {
IO.succeed { (s, cont, f) =>
if (cont(s)) {
m.use { r =>
def loop(s2: S): IO[E1, S] = {
read(r).flatMap {
case Some(a) => f(s2, a).flatMap(loop)
case None => IO.succeed(s2)
}
}
loop(s)
}
} else {
IO.succeed(s)
}
}
}
}
}
private val terminateWait = 5.seconds
private val errMsg = (_: Exception).toString
private def openFile(p: Path): IO[String, InputStream] = IO.syncException(Files.newInputStream(p)).mapError(errMsg)
override def run(args: List[String]): IO[Nothing, ExitStatus] = {
val program = for {
file <- args.headOption.map(x => IO.succeed(Paths.get(x))).getOrElse(IO.fail("File to read must be specified"))
size <- IO.syncException(Files.size(file)).mapError(errMsg)
_ <- putStrLn(s"File size is $size bytes")
// a stream of byte chunks read from the specified file
stream = bracketLoop(openFile(file))(in => IO.syncException(in.close()).attempt.void) { in =>
IO.syncException {
val buf = Array.ofDim[Byte](2000)
val read = in.read(buf)
if (read < 0) {
None
} else {
Some(Chunk.fromArray(buf).take(read))
}
}.mapError(errMsg)
}
// print the size of each chunk read, and track the total size and number of chunks
// requires https://github.com/oleg-py/better-monadic-for
(count, totalSize) <- stream
.withEffect(chunk => putStrLn(s"Read Chunk of size: ${chunk.length}"))
.foldLeft((0, 0))((acc, a) => (acc._1 + 1, acc._2 + a.length))
_ <- putStrLn(s"Done: $count chunks, total bytes $totalSize")
} yield ()
program.redeem(
error => putStrLn(error) *> IO.succeed(ExitStatus.ExitWhenDone(1, terminateWait)),
Function.const(IO.succeed(ExitStatus.ExitWhenDone(0, terminateWait)))
)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment