Skip to content

Instantly share code, notes, and snippets.

@squito
Last active April 23, 2019 14:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save squito/76527a394cf167b301108e7c3471a503 to your computer and use it in GitHub Desktop.
Save squito/76527a394cf167b301108e7c3471a503 to your computer and use it in GitHub Desktop.
// run with "--conf spark.cleaner.referenceTracking=false"
// spin up our full set of executors
sc.parallelize(1 to 100, 100).map { x => Thread.sleep(1000); x}.collect()
def getLocalDirs(): Array[String] = {
val clz = Class.forName("org.apache.spark.util.Utils")
val conf = org.apache.spark.SparkEnv.get.conf
val method = clz.getMethod("getConfiguredLocalDirs", conf.getClass())
method.setAccessible(true)
method.invoke(null, conf).asInstanceOf[Array[String]]
}
val localDirsByExec = sc.parallelize(1 to 100, 100).map { x =>
org.apache.spark.SparkEnv.get.executorId -> getLocalDirs().mkString(",")
}.collect().toMap
localDirsByExec.foreach { println }
// put in original code here ...
// ... and when you notice the issue, do this:
// choose a path here which you know will exist on the executor, that yarn won't clean up,
// and you can easily grab later on
val targetDir = new java.io.File("/tmp/copiedSparkTmpData")
def showShuffleFiles(): = {
println(sc.parallelize(1 to 100, 100).map { _ =>
import scala.collection.JavaConverters._
val localDirs = getLocalDirs()
val targetDir = new java.io.File("/tmp/copiedSparkTmpData")
val moves = org.apache.spark.SparkEnv.get.synchronized {
localDirs.flatMap { d =>
org.apache.commons.io.FileUtils.listFiles(new java.io.File(d), null, true)
.asScala
.filter{_.getName().contains("shuffle")}
}
}
org.apache.spark.SparkEnv.get.executorId -> moves
}.collect().toMap)
}
val shuffleFiles = sc.parallelize(1 to 100, 100).map { _ =>
import scala.collection.JavaConverters._
val localDirs = getLocalDirs()
val targetDir = new java.io.File("/tmp/copiedSparkTmpData")
// this task may get executed multiple times on each executor, or even concurrently by multiple tasks within one executor
// locking the SparkEnv is not safe in general, but it seems I can't create a real object which is common to all tasks in
// the repl
val moves = org.apache.spark.SparkEnv.get.synchronized {
targetDir.mkdirs()
localDirs.flatMap { d =>
val sfs: Array[java.io.File] = org.apache.commons.io.FileUtils.listFiles(new java.io.File(d), null, true)
.asScala
.filter{_.getName().contains("shuffle")}.toArray
sfs.map { sf =>
val parentDirName = sf.getParentFile().getName()
val target = new java.io.File(targetDir, parentDirName)
target.mkdirs()
val dest = new java.io.File(target, sf.getName())
val name = sf.toString
val success = sf.renameTo(dest)
name -> (dest.toString, success)
}
}
}
org.apache.spark.SparkEnv.get.executorId -> moves
}.collect().toMap
println("moved shuffle files:" + shuffleFiles)
// very simple code to generate shuffle data to put into the "original code" section
sc.parallelize(1 to 1e6.toInt, 100).map { x => (x % 10) -> x } .reduceByKey { _ + _ }.collect()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment