Skip to content

Instantly share code, notes, and snippets.

Created November 27, 2014 03:10
Show Gist options
  • Save marmbrus/15e72f7bc22337cf6653 to your computer and use it in GitHub Desktop.
Save marmbrus/15e72f7bc22337cf6653 to your computer and use it in GitHub Desktop.
Parallel list files on S3 with Spark
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.conf.Configuration
case class S3File(path: String, isDir: Boolean, size: Long) {
def children = listFiles(path)
def listFiles(path: String): Seq[S3File] = {
val fs = FileSystem.get(new, new Configuration())
fs.listStatus(new Path(path)).map(s => S3File(s.getPath.toString, s.isDir, s.getLen))
def recursivelyListFiles(path: String): Seq[S3File] = recursivelyListFiles(path :: Nil)
def recursivelyListFiles(paths: Seq[String]): Seq[S3File] = {
val remainingDirectories = new scala.collection.mutable.ArrayBuffer[S3File]
val allFiles = new scala.collection.mutable.ArrayBuffer[S3File]
remainingDirectories ++=, isDir = true, 0))
while (remainingDirectories.nonEmpty) {
val newDirs = sparkContext.parallelize(
val currentBatch = newDirs.mapPartitions { iter =>
val fs = FileSystem.get(new, new Configuration())
iter.flatMap{path =>
try {
fs.listStatus(new Path(path)).map(s => S3File(s.getPath.toString, s.isDir, s.getLen))
} catch {
case e: =>
println(s"File $path not found.")
val (dirs, files) = currentBatch.partition(_.isDir)
remainingDirectories ++= dirs
allFiles ++= files
Copy link

Andimeo commented Sep 12, 2018

why not just val allFiles = fs.listFiles(path, true)?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment