Skip to content

Instantly share code, notes, and snippets.

Last active February 26, 2016 08:28
Show Gist options
  • Save apivovarov/4de9a82e467401d005f1 to your computer and use it in GitHub Desktop.
Save apivovarov/4de9a82e467401d005f1 to your computer and use it in GitHub Desktop.
package org.apache.hadoop.mapred;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCommitter;
import org.apache.hadoop.mapred.TaskAttemptContext;
import org.apache.hadoop.mapred.TaskAttemptID;
* OutputCommitter suitable for S3 workloads. Unlike the usual FileOutputCommitter, which
* writes files to a _temporary/ directory before renaming them to their final location, this
* simply writes directly to the final location.
* The FileOutputCommitter is required for HDFS + speculation, which allows only one writer at
* a time for a file (so two people racing to write the same file would not work). However, S3
* supports multiple writers outputting to the same file, where visibility is guaranteed to be
* atomic. This is a monotonic operation: all writers should be writing the same data, so which
* one wins is immaterial.
* Code adapted from Ian Hummel's code from this PR:
* Add the following settings to mapred-site.xml
* <property>
* <name>mapred.output.committer.class</name>
* <value>org.apache.hadoop.mapred.DirectOutputCommitter</value>
* </property>
public class DirectOutputCommitter extends OutputCommitter {
public void setupJob(JobContext jobContext) throws IOException {
public void setupTask(TaskAttemptContext taskContext) throws IOException {
public boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException {
// We return true here to guard against implementations that do not handle false correctly.
// The meaning of returning false is not entirely clear, so it's possible to be interpreted
// as an error. Returning true just means that commitTask() will be called, which is a no-op.
return true;
public void commitTask(TaskAttemptContext taskContext) throws IOException {
public void abortTask(TaskAttemptContext taskContext) throws IOException {
* Creates a _SUCCESS file to indicate the entire job was successful.
* This mimics the behavior of FileOutputCommitter, reusing the same file name and conf option.
public void commitJob(JobContext context) throws IOException {
JobConf conf = context.getJobConf();
if (shouldCreateSuccessFile(conf)) {
Path outputPath = FileOutputFormat.getOutputPath(conf);
if (outputPath != null) {
FileSystem fileSys = outputPath.getFileSystem(conf);
Path filePath = new Path(outputPath, FileOutputCommitter.SUCCEEDED_FILE_NAME);
/** By default, we do create the _SUCCESS file, but we allow it to be turned off. */
private boolean shouldCreateSuccessFile(JobConf conf) {
return conf.getBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", true);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment