Skip to content

Instantly share code, notes, and snippets.

@scr
Last active January 13, 2018 23:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save scr/9d9fd533f02f7dcf4c4029ab55901412 to your computer and use it in GitHub Desktop.
Save scr/9d9fd533f02f7dcf4c4029ab55901412 to your computer and use it in GitHub Desktop.
package tinker.scalazEffect
import java.nio.ByteBuffer
import java.nio.channels.{AsynchronousChannel, AsynchronousFileChannel, CompletionHandler}
import java.nio.charset.{Charset, StandardCharsets}
import java.nio.file.{Path, Paths, StandardOpenOption}
import scalaz.data.Disjunction.{-\/, \/-}
import scalaz.effect.IO.IOTrampoline
import scalaz.effect.IO.IOTrampoline.{IOTDone, IOTMore}
import scalaz.effect.console._
import scalaz.effect.{IO, SafeApp}
object FileMain extends SafeApp {
def closeAC(ac: AsynchronousChannel): IO[Unit] = {
ac.close()
IO.unit
}
def readAllBytes(path: Path): IO[Array[Byte]] = {
IO.sync(AsynchronousFileChannel.open(path, StandardOpenOption.READ)).bracket(closeAC) { afc =>
for {
afcSize <- IO.sync {
val size = afc.size()
require(size <= Int.MaxValue)
size.asInstanceOf[Int]
}
bufferSize <- IO.now(1024 * 1024)
range <- IO.point(0 until afcSize by bufferSize)
buffer <- IO.sync(ByteBuffer.allocate(1024 * 1024))
bytes <- IO.sync(new Array[Byte](afcSize))
ret <- IO.suspend {
def readMore(position: Int): IO[IO[Array[Byte]]] = {
IO.async[IO[Array[Byte]]] { cb =>
// TODO(scr): How can this be more IO-savvy?
val completionHandler: CompletionHandler[Integer, Integer] = new CompletionHandler[Integer, Integer] {
override def failed(exc: Throwable, attachment: Integer): Unit = cb(-\/(exc))
override def completed(numRead: Integer, position: Integer): Unit = {
buffer.flip()
buffer.get(bytes, position, numRead)
val nextPosition = position + numRead
if (nextPosition >= afcSize) {
cb(\/-(IO.point(bytes)))
} else {
cb(\/-(IO.flatten(readMore(nextPosition))))
}
}
}
buffer.clear()
afc.read[Integer](buffer, position, position, completionHandler)
}
}
if (bufferSize >= 0) IO.flatten(readMore(0)) else IO.point(bytes)
}
} yield ret
}
}
def bytesToString(bytes: Array[Byte], charset: Charset = StandardCharsets.UTF_8): IO[String] = {
IO.sync(new String(bytes, charset))
}
override def run(args: List[String]): IO[Unit] = {
for {
fooPath <- IO.point(Paths.get("foo.txt"))
bytes <- readAllBytes(fooPath)
s <- bytesToString(bytes)
_ <- putStrLn(s"out $s")
} yield ()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment