Skip to content

Instantly share code, notes, and snippets.

@JoshRosen
Created September 1, 2015 19:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save JoshRosen/1479af6aebc4512fc41b to your computer and use it in GitHub Desktop.
Save JoshRosen/1479af6aebc4512fc41b to your computer and use it in GitHub Desktop.
javaHome := Some(file("/Library/Java/JavaVirtualMachines/jdk1.7.0_65.jdk/Contents/Home"))
libraryDependencies ++= Seq(
//"org.apache.hadoop" % "hadoop-core" % "1.2.1",
"org.apache.spark" %% "spark-core" % "1.4.1"
)
import org.apache.spark._
object Main {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().set("spark.speculation", "true")
val sc = new SparkContext("local[2, 4]", "test", conf)
//sc.setLogLevel("DEBUG")
sc.hadoopConfiguration.set("mapred.output.committer.class", classOf[MyOutputCommitter].getCanonicalName)
val tempDir = java.nio.file.Files.createTempDirectory("outputcommitter-test")
java.nio.file.Files.delete(tempDir)
sc.parallelize(1 to 10000, 4).map(x => x.toString).saveAsTextFile(tempDir.toFile.getAbsolutePath)
}
}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.TaskAttemptContext
import org.apache.hadoop.mapred.FileOutputCommitter
import org.apache.spark.TaskContext
class MyOutputCommitter() extends FileOutputCommitter() {
def init(): Unit = {}
override def commitTask(context: TaskAttemptContext): Unit = {
val ctx = TaskContext.get()
if (ctx.attemptNumber < 3 && ctx.partitionId == 1) {
throw new RuntimeException("!!!!!")
// throw new java.io.FileNotFoundException("Dummy")
}
super.commitTask(context)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment