Skip to content

Instantly share code, notes, and snippets.

Created August 25, 2013 05:34
Show Gist options
  • Save johnynek/6332191 to your computer and use it in GitHub Desktop.
Save johnynek/6332191 to your computer and use it in GitHub Desktop.
IO Monad for the HDFS FileSystem: Make your HdfsIO[T] object (for example op), set an implicit FileSystem, then call TOTALLY UNTESTED! (but it does compile)
import{FileNotFoundException, IOException}
import org.apache.hadoop.fs.{
/** IO Monad for HDFS */
sealed trait HdfsIO[+T] {
def map[U](fn: T => U): HdfsIO[U] =
def flatMap[U](fn: T => HdfsIO[U]): HdfsIO[U] =
FlatMapped(this, fn)
def recover[U >: T](rec: IOException => U): HdfsIO[U] =
def recoverWith[U >: T](rec: IOException => HdfsIO[U]): HdfsIO[U] =
Recover(this, rec)
case class FSOp[+T](effect: (FileSystem) => T) extends HdfsIO[T]
case class FlatMapped[U,T](prev: HdfsIO[U], next: U => HdfsIO[T]) extends HdfsIO[T]
case class Recover[U,T](prev: HdfsIO[U], rec: IOException => HdfsIO[T]) extends HdfsIO[T]
object HdfsIO {
def maybeError[T](t: => T): Either[IOException, T] =
try { Right(t) }
catch {
case x: IOException => Left(x)
implicit def pathFromString(path: String): Path = new Path(path)
/** returns false if the file already exists */
def createNewEmptyFile(path: Path): HdfsIO[Boolean] =
/** Creates a new file or gives an exception,
* which you should recover from */
def createNewFile(path: Path): HdfsIO[FSDataOutputStream] =
FSOp(_.create(path, false))
/** Delete a path when the FileSystem is closed. Useful
* for cleaning up temp files
def deleteOnExit(path: Path): HdfsIO[Boolean] =
/** same as delete(path, false), fails if path is a directory */
def deleteFile(path: Path): HdfsIO[Boolean] =
FSOp(_.delete(path, false))
/** recursively delete a directory
* same as delete(path, true) */
def deleteDir(path: Path): HdfsIO[Boolean] =
FSOp(_.delete(path, true))
def exists(path: Path): HdfsIO[Boolean] =
def getFileStatus(path: Path): HdfsIO[Either[FileNotFoundException, FileStatus]] =
FSOp({fs =>
try {
catch {
case fnf: FileNotFoundException => Left(fnf)
def getHomeDir: HdfsIO[Path] =
def isFile(p: Path): HdfsIO[Boolean] =
def mkdirs(path: Path): HdfsIO[Boolean] =
def open(path: Path): HdfsIO[FSDataInputStream] =
/** Overwrite OR create a new file */
def overwriteFile(path: Path): HdfsIO[FSDataOutputStream] =
FSOp(_.create(path, true))
def rename(src: Path, dest: Path): HdfsIO[Boolean] =
FSOp(_.rename(src, dest))
def pure[T](t: => T) = FSOp(_ => t)
/** Actually run an entire file system operation */
def run[T](h: HdfsIO[T])(implicit fs: FileSystem): T = {
def loop(cur: HdfsIO[Any],
stack: List[Either[IOException => HdfsIO[Any], Any => HdfsIO[Any]]]): Any = {
cur match {
case FSOp(effect) =>
maybeError(effect(fs)) match {
case Right(r) =>
stack.dropWhile(_.isLeft) match {
case Nil => r
case h::tail =>
loop(h.asInstanceOf[Any => HdfsIO[Any]].apply(r), tail)
case Left(x) =>
stack.dropWhile(_.isRight) match {
case Nil => throw x
case h::tail =>
loop(h.asInstanceOf[IOException => HdfsIO[Any]].apply(x), tail)
case FlatMapped(p, n) => loop(p, Right(n) :: stack)
case Recover(p, n) => loop(p, Left(n) :: stack)
loop(h, Nil).asInstanceOf[T]
Copy link

this looks so cool

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment