Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save themodernlife/e3b07c23ba978f6cc98b73e3f3609abe to your computer and use it in GitHub Desktop.
Save themodernlife/e3b07c23ba978f6cc98b73e3f3609abe to your computer and use it in GitHub Desktop.
Errors using Spark, Parquet and MultipleOutputs

When using MultipleOutputs with DeprecatedParquetOutputFormat, you see this error:

java.lang.NullPointerException
  at org.apache.hadoop.fs.Path.<init>(Path.java:105)
  at org.apache.hadoop.fs.Path.<init>(Path.java:94)
  at org.apache.parquet.hadoop.mapred.DeprecatedParquetOutputFormat.getDefaultWorkFile(DeprecatedParquetOutputFormat.java:69)
  at org.apache.parquet.hadoop.mapred.DeprecatedParquetOutputFormat.access$100(DeprecatedParquetOutputFormat.java:36)
  at org.apache.parquet.hadoop.mapred.DeprecatedParquetOutputFormat$RecordWriterWrapper.<init>(DeprecatedParquetOutputFormat.java:89)
  at org.apache.parquet.hadoop.mapred.DeprecatedParquetOutputFormat.getRecordWriter(DeprecatedParquetOutputFormat.java:77)
  at com.bloomberg.bdip.ParquetAvroMultipleOutputs$InternalFileOutputFormat.getRecordWriter(ParquetAvroMultipleOutputs.java:537)
  at com.bloomberg.bdip.ParquetAvroMultipleOutputs.getRecordWriter(ParquetAvroMultipleOutputs.java:326)
  at com.bloomberg.bdip.ParquetAvroMultipleOutputs.getCollector(ParquetAvroMultipleOutputs.java:483)
  at com.bloomberg.bdip.ParquetAvroMultipleOutputs.collect(ParquetAvroMultipleOutputs.java:402)
  at com.bloomberg.bdip.ParquetAvroMultipleOutputFormat$$anon$2.write(PartitionedOutputFormats.scala:124)
  at com.bloomberg.bdip.ParquetAvroMultipleOutputFormat$$anon$2.write(PartitionedOutputFormats.scala:104)
  at org.apache.spark.SparkHadoopWriter.write(SparkHadoopWriter.scala:96)
  at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply$mcV$sp(PairRDDFunctions.scala:1199)
  at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1197)
  at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1197)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1250)
  at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1205)
  at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1185)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
  at org.apache.spark.scheduler.Task.run(Task.scala:89)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
  at java.lang.Thread.run(Unknown Source)

The issue is that a specific key is not set in conf by Spark that is usually set by the Hadoop MR framework.

From org.apache.hadoop.mapred.Task#initialize(...)

Path outputPath = FileOutputFormat.getOutputPath(conf);
if (outputPath != null) {
  if ((committer instanceof FileOutputCommitter)) {
    FileOutputFormat.setWorkOutputPath(conf, 
      ((FileOutputCommitter)committer).getTaskAttemptPath(taskContext));
  } else {
    FileOutputFormat.setWorkOutputPath(conf, outputPath);
  }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment