Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save apivovarov/45f629de10e890f156ac to your computer and use it in GitHub Desktop.
Save apivovarov/45f629de10e890f156ac to your computer and use it in GitHub Desktop.
package org.apache.hadoop.mapreduce.lib.output;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
/**
* OutputCommitter suitable for S3 workloads. Unlike the usual FileOutputCommitter, which
* writes files to a _temporary/ directory before renaming them to their final location, this
* simply writes directly to the final location.
*
* The FileOutputCommitter is required for HDFS + speculation, which allows only one writer at
* a time for a file (so two people racing to write the same file would not work). However, S3
* supports multiple writers outputting to the same file, where visibility is guaranteed to be
* atomic. This is a monotonic operation: all writers should be writing the same data, so which
* one wins is immaterial.
*
* Put jar file with the class to /usr/lib/hadoop/client, /usr/lib/spark/lib
* Add the following settings to mapred-site.xml
* <property>
* <name>mapred.output.direct.EmrFileSystem</name>
* <value>true</value>
* </property>
* <property>
* <name>mapred.output.direct.NativeS3FileSystem</name>
* <value>true</value>
* </property>
* <property>
* <name>mapred.output.committer.class</name>
* <value>org.apache.hadoop.mapreduce.lib.output.DirectFileOutputCommitter</value>
* </property>
*/
public class DirectFileOutputCommitter extends FileOutputCommitter {
private static final Log LOG = LogFactory.getLog(DirectFileOutputCommitter.class);
private Path outputPath = null;
private final boolean directWrite;
public DirectFileOutputCommitter(Path outputPath, TaskAttemptContext context) throws IOException {
super(outputPath, context);
if(outputPath != null && context != null) {
this.outputPath = outputPath;
this.directWrite = isDirectWrite(context.getConfiguration(), outputPath.getFileSystem(context.getConfiguration()));
} else {
this.directWrite = false;
}
}
public DirectFileOutputCommitter(Path outputPath, JobContext context) throws IOException {
super(outputPath, context);
if(outputPath != null && context != null) {
this.outputPath = outputPath;
this.directWrite = isDirectWrite(context.getConfiguration(), outputPath.getFileSystem(context.getConfiguration()));
} else {
this.directWrite = false;
}
}
public void setupJob(JobContext context) throws IOException {
if(this.directWrite) {
LOG.info("Nothing to setup since the outputs are written directly.");
} else {
super.setupJob(context);
}
}
public void cleanupJob(JobContext context) throws IOException {
if(this.directWrite) {
LOG.info("Nothing to clean up since no temporary files were written.");
} else {
super.cleanupJob(context);
}
}
public void setupTask(TaskAttemptContext context) throws IOException {
if(!this.directWrite) {
super.setupTask(context);
}
}
public void commitTask(TaskAttemptContext context) throws IOException {
if(this.directWrite) {
LOG.info("Commit should not be called since this task doesnt have any commitable files. Also needsTaskCommit returns false");
} else {
super.commitTask(context);
}
}
public void commitJob(JobContext context) throws IOException {
if(this.directWrite) {
if(this.hasOutputPath()) {
this.cleanupJob(context);
if(context.getConfiguration().getBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", true)) {
Path markerPath = new Path(this.outputPath, "_SUCCESS");
FileSystem fs = markerPath.getFileSystem(context.getConfiguration());
fs.create(markerPath).close();
}
}
} else {
super.commitJob(context);
}
}
public void abortTask(TaskAttemptContext context) throws IOException {
if(this.directWrite) {
LOG.info("Nothing to clean up on abort since there are no temporary files written");
} else {
super.abortTask(context);
}
}
public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
return this.directWrite?false:super.needsTaskCommit(context);
}
public Path getWorkPath() throws IOException {
return this.directWrite?this.outputPath:super.getWorkPath();
}
private static boolean isDirectWrite(Configuration c, FileSystem fs) {
return c.getBoolean("mapred.output.direct." + fs.getClass().getSimpleName(), false);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment