Create a gist now

Instantly share code, notes, and snippets.

File Async IO for Scala
import java.io.IOException
import java.nio.ByteBuffer
import java.nio.channels.{AsynchronousFileChannel, CompletionHandler}
import java.nio.file.Paths
import java.nio.file.StandardOpenOption._
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.Try
/**
* File asynchronous IO wrapper around java.nio.file for scala Futures
*/
object FileAsyncIO {
/**
* Read a file
* @param file file path
* @param ec execution context
* @return file contents as bytes
*/
def read(file: String)(implicit ec: ExecutionContext): Future[Array[Byte]] = {
val p = Promise[Array[Byte]]()
try {
val channel = AsynchronousFileChannel.open(Paths.get(file), READ)
val buffer = ByteBuffer.allocate(channel.size().toInt)
channel.read(buffer, 0L, buffer, onComplete(channel, p))
}
catch {
case t: Throwable => p.failure(t)
}
p.future
}
/**
* Read a text file
* @param file file path
* @param ec execution context
* @return file contents as string
*/
def readText(file: String, charsetName: String = "UTF-8")
(implicit ec: ExecutionContext): Future[String] = read(file).map(new String(_, charsetName))
/**
* Write into a file
* @param file file path
* @param bytes file contents as bytes
* @param ec execution context
* @return unit
*/
def write(file: String, bytes: Array[Byte])(implicit ec: ExecutionContext): Future[Unit] = {
val p = Promise[Array[Byte]]()
try {
val channel = AsynchronousFileChannel.open(Paths.get(file), CREATE, WRITE)
val buffer = ByteBuffer.wrap(bytes)
channel.write(buffer, 0L, buffer, onComplete(channel, p))
}
catch {
case t: Throwable => p.failure(t)
}
p.future.map(_ => {})
}
/**
* Write into a text file
* @param file file path
* @param s file contents as string
* @param ec execution context
* @return unit
*/
def writeText(file: String, s: String, charsetName: String = "UTF-8")
(implicit ec: ExecutionContext): Future[Unit] = write(file, s.getBytes(charsetName))
private def closeSafely(channel: AsynchronousFileChannel) =
try {
channel.close()
} catch {
case e: IOException =>
}
private def onComplete(channel: AsynchronousFileChannel, p: Promise[Array[Byte]]) = {
new CompletionHandler[Integer, ByteBuffer]() {
def completed(res: Integer, buffer: ByteBuffer): Unit = {
p.complete(Try {
buffer.array()
})
closeSafely(channel)
}
def failed(t: Throwable, buffer: ByteBuffer): Unit = {
p.failure(t)
closeSafely(channel)
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment