Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Errors using Spark, Parquet and MultipleOutputs

When using MultipleOutputs with DeprecatedParquetOutputFormat, you see this error:

java.lang.NullPointerException
  at org.apache.hadoop.fs.Path.<init>(Path.java:105)
  at org.apache.hadoop.fs.Path.<init>(Path.java:94)
  at org.apache.parquet.hadoop.mapred.DeprecatedParquetOutputFormat.getDefaultWorkFile(DeprecatedParquetOutputFormat.java:69)
  at org.apache.parquet.hadoop.mapred.DeprecatedParquetOutputFormat.access$100(DeprecatedParquetOutputFormat.java:36)
  at org.apache.parquet.hadoop.mapred.DeprecatedParquetOutputFormat$RecordWriterWrapper.<init>(DeprecatedParquetOutputFormat.java:89)
  at org.apache.parquet.hadoop.mapred.DeprecatedParquetOutputFormat.getRecordWriter(DeprecatedParquetOutputFormat.java:77)
  at com.bloomberg.bdip.ParquetAvroMultipleOutputs$InternalFileOutputFormat.getRecordWriter(ParquetAvroMultipleOutputs.java:537)
  at com.bloomberg.bdip.ParquetAvroMultipleOutputs.getRecordWriter(ParquetAvroMultipleOutputs.java:326)
  at com.bloomberg.bdip.ParquetAvroMultipleOutputs.getCollector(ParquetAvroMultipleOutputs.java:483)
  at com.bloomberg.bdip.ParquetAvroMultipleOutputs.collect(ParquetAvroMultipleOutputs.java:402)
  at com.bloomberg.bdip.ParquetAvroMultipleOutputFormat$$anon$2.write(PartitionedOutputFormats.scala:124)
  at com.bloomberg.bdip.ParquetAvroMultipleOutputFormat$$anon$2.write(PartitionedOutputFormats.scala:104)
  at org.apache.spark.SparkHadoopWriter.write(SparkHadoopWriter.scala:96)
  at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply$mcV$sp(PairRDDFunctions.scala:1199)
  at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1197)
  at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1197)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1250)
  at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1205)
  at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1185)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
  at org.apache.spark.scheduler.Task.run(Task.scala:89)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
  at java.lang.Thread.run(Unknown Source)

The issue is that a specific key is not set in conf by Spark that is usually set by the Hadoop MR framework.

From org.apache.hadoop.mapred.Task#initialize(...)

Path outputPath = FileOutputFormat.getOutputPath(conf);
if (outputPath != null) {
  if ((committer instanceof FileOutputCommitter)) {
    FileOutputFormat.setWorkOutputPath(conf, 
      ((FileOutputCommitter)committer).getTaskAttemptPath(taskContext));
  } else {
    FileOutputFormat.setWorkOutputPath(conf, outputPath);
  }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment