Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
DirectOutputCommitter.scala
/*
* Copyright 2015 Databricks, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred._
/**
* OutputCommitter suitable for S3 workloads. Unlike the usual FileOutputCommitter, which
* writes files to a _temporary/ directory before renaming them to their final location, this
* simply writes directly to the final location.
*
* The FileOutputCommitter is required for HDFS + speculation, which allows only one writer at
* a time for a file (so two people racing to write the same file would not work). However, S3
* supports multiple writers outputting to the same file, where visibility is guaranteed to be
* atomic. This is a monotonic operation: all writers should be writing the same data, so which
* one wins is immaterial.
*
* Code adapted from Ian Hummel's code from this PR:
* https://github.com/themodernlife/spark/commit/4359664b1d557d55b0579023df809542386d5b8c
*/
class DirectOutputCommitter extends OutputCommitter {
override def setupJob(jobContext: JobContext): Unit = { }
override def setupTask(taskContext: TaskAttemptContext): Unit = { }
override def needsTaskCommit(taskContext: TaskAttemptContext): Boolean = {
// We return true here to guard against implementations that do not handle false correctly.
// The meaning of returning false is not entirely clear, so it's possible to be interpreted
// as an error. Returning true just means that commitTask() will be called, which is a no-op.
true
}
override def commitTask(taskContext: TaskAttemptContext): Unit = { }
override def abortTask(taskContext: TaskAttemptContext): Unit = { }
/**
* Creates a _SUCCESS file to indicate the entire job was successful.
* This mimics the behavior of FileOutputCommitter, reusing the same file name and conf option.
*/
override def commitJob(context: JobContext): Unit = {
val conf = context.getJobConf
if (shouldCreateSuccessFile(conf)) {
val outputPath = FileOutputFormat.getOutputPath(conf)
if (outputPath != null) {
val fileSys = outputPath.getFileSystem(conf)
val filePath = new Path(outputPath, FileOutputCommitter.SUCCEEDED_FILE_NAME)
fileSys.create(filePath).close()
}
}
}
/** By default, we do create the _SUCCESS file, but we allow it to be turned off. */
private def shouldCreateSuccessFile(conf: JobConf): Boolean = {
conf.getBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", true)
}
}
@thodemoor

This comment has been minimized.

Show comment Hide comment
@thodemoor

thodemoor Mar 22, 2015

Hi Aaron,

Could you elaborate on needsTaskCommit? Which apps?

We're adding "objectstore-awareness" to Hadoop (https://issues.apache.org/jira/browse/HADOOP-9565) so you no longer will need to override the outputcommitter manually in Spark. Want to understand this better so we don't reintroduce issues.

Thanks!

Hi Aaron,

Could you elaborate on needsTaskCommit? Which apps?

We're adding "objectstore-awareness" to Hadoop (https://issues.apache.org/jira/browse/HADOOP-9565) so you no longer will need to override the outputcommitter manually in Spark. Want to understand this better so we don't reintroduce issues.

Thanks!

@luxiaoxun

This comment has been minimized.

Show comment Hide comment
@luxiaoxun

luxiaoxun Nov 21, 2016

How to use this in spark ?

How to use this in spark ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment