Skip to content

Instantly share code, notes, and snippets.

@ssimeonov
Last active July 26, 2016 23:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ssimeonov/3fa68144d4be4d94521a84f91e28669d to your computer and use it in GitHub Desktop.
Save ssimeonov/3fa68144d4be4d94521a84f91e28669d to your computer and use it in GitHub Desktop.
Distributed file listing using Spark and the Hadoop file system APIs
case class FInfo(
path: String,
parent: String,
isDir: Boolean,
size: Long,
modificationTime: Long,
partitions: Map[String, String]) {
// @todo encoding issues
def hasExt(ext: String) = endsWith(ext)
def endsWith(str: String) = path.endsWith(str)
}
def getPartitions(path: String): Map[String, String] =
path.split('/')
.filter(_.contains('='))
.foldLeft(Map.empty[String, String]){
case (memo, partition) =>
val parts = partition.split('=')
memo + (parts(0) -> parts(1))
}
def collectFiles(paths: Seq[String]): Seq[FInfo] = {
val remainingDirectories = new ArrayBuffer[FInfo]
val allFiles = new ArrayBuffer[FInfo]
remainingDirectories ++= paths.map(new FInfo(_, "", isDir = true, 0, 0, Map.empty[String, String]))
while (remainingDirectories.nonEmpty) {
val newDirs = sc.parallelize(remainingDirectories.map(_.path))
val currentBatch = newDirs.mapPartitions { iter =>
val fs = FileSystem.get(new java.net.URI(paths.head), new Configuration())
iter.flatMap{path =>
try {
fs.listStatus(new Path(path))
.map(s => new FInfo(
s.getPath.toString,
path,
s.isDir,
s.getLen,
s.getModificationTime,
getPartitions(s.getPath.toString)))
} catch {
case e: java.io.FileNotFoundException =>
println(s"File $path not found.")
Nil
}
}
}.collect
val (dirs, files) = currentBatch.partition(_.isDir)
remainingDirectories.clear()
remainingDirectories ++= dirs
allFiles ++= files
}
allFiles
}
def collectFiles(path:String): Seq[FInfo] =
collectFiles(path::Nil)
def filesDF(files: Seq[FInfo]): DataFrame = {
files.toDF()
.withColumn("ts_utc", expr("from_unixtime(modificationTime/1000)"))
.select("path", "size", "ts_utc", "isDir", "partitions")
.sort("path")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment