Skip to content

Instantly share code, notes, and snippets.

@johnynek
Created August 25, 2013 05:34
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save johnynek/6332191 to your computer and use it in GitHub Desktop.
Save johnynek/6332191 to your computer and use it in GitHub Desktop.
IO Monad for the HDFS FileSystem: http://hadoop.apache.org/docs/r1.0.4/api/org/apache/hadoop/fs/FileSystem.html Make your HdfsIO[T] object (for example op), set an implicit FileSystem, then call HdfsIO.run(op) TOTALLY UNTESTED! (but it does compile)
import java.io.{FileNotFoundException, IOException}
import org.apache.hadoop.fs.{
FileSystem,
FileStatus,
FSDataInputStream,
FSDataOutputStream,
Path
}
/** IO Monad for HDFS */
sealed trait HdfsIO[+T] {
def map[U](fn: T => U): HdfsIO[U] =
flatMap(fn.andThen(HdfsIO.pure(_)))
def flatMap[U](fn: T => HdfsIO[U]): HdfsIO[U] =
FlatMapped(this, fn)
def recover[U >: T](rec: IOException => U): HdfsIO[U] =
recoverWith(rec.andThen(HdfsIO.pure(_)))
def recoverWith[U >: T](rec: IOException => HdfsIO[U]): HdfsIO[U] =
Recover(this, rec)
}
case class FSOp[+T](effect: (FileSystem) => T) extends HdfsIO[T]
case class FlatMapped[U,T](prev: HdfsIO[U], next: U => HdfsIO[T]) extends HdfsIO[T]
case class Recover[U,T](prev: HdfsIO[U], rec: IOException => HdfsIO[T]) extends HdfsIO[T]
object HdfsIO {
def maybeError[T](t: => T): Either[IOException, T] =
try { Right(t) }
catch {
case x: IOException => Left(x)
}
implicit def pathFromString(path: String): Path = new Path(path)
/** returns false if the file already exists */
def createNewEmptyFile(path: Path): HdfsIO[Boolean] =
FSOp(_.createNewFile(path))
/** Creates a new file or gives an exception,
* which you should recover from */
def createNewFile(path: Path): HdfsIO[FSDataOutputStream] =
FSOp(_.create(path, false))
/** Delete a path when the FileSystem is closed. Useful
* for cleaning up temp files
*/
def deleteOnExit(path: Path): HdfsIO[Boolean] =
FSOp(_.deleteOnExit(path))
/** same as delete(path, false), fails if path is a directory */
def deleteFile(path: Path): HdfsIO[Boolean] =
FSOp(_.delete(path, false))
/** recursively delete a directory
* same as delete(path, true) */
def deleteDir(path: Path): HdfsIO[Boolean] =
FSOp(_.delete(path, true))
def exists(path: Path): HdfsIO[Boolean] =
FSOp(_.exists(path))
def getFileStatus(path: Path): HdfsIO[Either[FileNotFoundException, FileStatus]] =
FSOp({fs =>
try {
Right(fs.getFileStatus(path))
}
catch {
case fnf: FileNotFoundException => Left(fnf)
}
})
def getHomeDir: HdfsIO[Path] =
FSOp(_.getHomeDirectory)
def isFile(p: Path): HdfsIO[Boolean] =
FSOp(_.isFile(p))
def mkdirs(path: Path): HdfsIO[Boolean] =
FSOp(_.mkdirs(path))
def open(path: Path): HdfsIO[FSDataInputStream] =
FSOp(_.open(path))
/** Overwrite OR create a new file */
def overwriteFile(path: Path): HdfsIO[FSDataOutputStream] =
FSOp(_.create(path, true))
def rename(src: Path, dest: Path): HdfsIO[Boolean] =
FSOp(_.rename(src, dest))
///////////////////////////////
def pure[T](t: => T) = FSOp(_ => t)
/** Actually run an entire file system operation */
def run[T](h: HdfsIO[T])(implicit fs: FileSystem): T = {
@annotation.tailrec
def loop(cur: HdfsIO[Any],
stack: List[Either[IOException => HdfsIO[Any], Any => HdfsIO[Any]]]): Any = {
cur match {
case FSOp(effect) =>
maybeError(effect(fs)) match {
case Right(r) =>
stack.dropWhile(_.isLeft) match {
case Nil => r
case h::tail =>
loop(h.asInstanceOf[Any => HdfsIO[Any]].apply(r), tail)
}
case Left(x) =>
stack.dropWhile(_.isRight) match {
case Nil => throw x
case h::tail =>
loop(h.asInstanceOf[IOException => HdfsIO[Any]].apply(x), tail)
}
}
case FlatMapped(p, n) => loop(p, Right(n) :: stack)
case Recover(p, n) => loop(p, Left(n) :: stack)
}
}
loop(h, Nil).asInstanceOf[T]
}
}
@softprops
Copy link

this looks so cool

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment