Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save darkseed/12f58e684768529b72d7d89f0440ea5e to your computer and use it in GitHub Desktop.
Save darkseed/12f58e684768529b72d7d89f0440ea5e to your computer and use it in GitHub Desktop.
Parallel list files on S3 with Spark
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.conf.Configuration
case class S3File(path: String, isDir: Boolean, size: Long) {
def children = listFiles(path)
}
def listFiles(path: String): Seq[S3File] = {
val fs = FileSystem.get(new java.net.URI(path), new Configuration())
fs.listStatus(new Path(path)).map(s => S3File(s.getPath.toString, s.isDir, s.getLen))
}
def recursivelyListFiles(path: String): Seq[S3File] = recursivelyListFiles(path :: Nil)
def recursivelyListFiles(paths: Seq[String]): Seq[S3File] = {
val remainingDirectories = new scala.collection.mutable.ArrayBuffer[S3File]
val allFiles = new scala.collection.mutable.ArrayBuffer[S3File]
remainingDirectories ++= paths.map(S3File(_, isDir = true, 0))
while (remainingDirectories.nonEmpty) {
val newDirs = sparkContext.parallelize(remainingDirectories.map(_.path))
val currentBatch = newDirs.mapPartitions { iter =>
val fs = FileSystem.get(new java.net.URI(paths.head), new Configuration())
iter.flatMap{path =>
try {
fs.listStatus(new Path(path)).map(s => S3File(s.getPath.toString, s.isDir, s.getLen))
} catch {
case e: java.io.FileNotFoundException =>
println(s"File $path not found.")
Nil
}
}
}.collect()
val (dirs, files) = currentBatch.partition(_.isDir)
remainingDirectories.clear()
remainingDirectories ++= dirs
allFiles ++= files
}
allFiles
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment