Skip to content

Instantly share code, notes, and snippets.

@zygm0nt
Created January 26, 2018 10:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zygm0nt/b9f4c65ecf4d187099e5586ee6a2839b to your computer and use it in GitHub Desktop.
Save zygm0nt/b9f4c65ecf4d187099e5586ee6a2839b to your computer and use it in GitHub Desktop.
repartition small files on hdfs
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.SparkSession
object App {
def lsFiles(path: String, ss: SparkSession) = {
FileSystem.get(ss.sparkContext.hadoopConfiguration).listStatus(new Path(path)).toSeq.map {
_.getPath.toString
}
}
def repackageFile(path: String, out: String, ss: SparkSession) {
ss.read.textFile(path).coalesce(50).write.option("compression", "snappy").text(out)
}
def process(path: String, outPrefix: String, ss: SparkSession) = {
lsFiles(path, ss).foreach { sPath =>
val subDir = lsFiles(sPath, ss)
if (subDir.size > 10) {
println(s"repackaging $subDir")
val output = outPrefix + sPath.split("/").last
repackageFile(sPath, output, ss)
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment