Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save marlhammer/9520893 to your computer and use it in GitHub Desktop.
Save marlhammer/9520893 to your computer and use it in GitHub Desktop.
import java.io.File;
import java.io.IOException;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus.State;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
public class HdfsSyncingLocalFileOutputCommitter extends FileOutputCommitter {
public static final String PREFIX_LUCENE_INDEX_PART = "part-";
private final FileSystem localFileSystem;
private final File localScratchPath;
private final FileSystem hdfsFileSystem;
private final Path hdfsSyncPath;
public HdfsSyncingLocalFileOutputCommitter(File localScratchPath, Path hdfsSyncPath, TaskAttemptContext context) throws IOException {
super(hdfsSyncPath, context);
Configuration conf = context.getConfiguration();
this.localFileSystem = FileSystem.getLocal(conf);
this.localScratchPath = localScratchPath;
this.hdfsFileSystem = FileSystem.get(conf);
this.hdfsSyncPath = hdfsSyncPath;
}
public File getLocalScratchPath() {
return localScratchPath;
}
@Override
public void abortJob(JobContext context, State state) throws IOException {
deleteLocalScratchPath();
super.abortJob(context, state);
}
@Override
public void abortTask(TaskAttemptContext context) {
deleteLocalScratchPath();
super.abortTask(context);
}
@Override
public void commitTask(TaskAttemptContext context) throws IOException {
if (localScratchPath.exists()) {
syncToHdfs(context);
}
super.commitTask(context);
deleteLocalScratchPath();
}
@Override
public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
return localScratchPath.exists() || super.needsTaskCommit(context);
}
private void syncToHdfs(TaskAttemptContext context) throws IOException {
if (!hdfsFileSystem.mkdirs(hdfsSyncPath)) {
throw new IOException(String.format("Cannot create HDFS directory at [%s] to sync Lucene index!", hdfsSyncPath));
}
// Create subdirectory in HDFS for the Lucene index part from this particular reducer.
Path indexPartHdfsFilePath = new Path(hdfsSyncPath, PREFIX_LUCENE_INDEX_PART + context.getTaskAttemptID().getTaskID().getId());
if (!hdfsFileSystem.mkdirs(indexPartHdfsFilePath)) {
throw new IOException(String.format("Cannot create HDFS directory at [%s] to sync Lucene index!", indexPartHdfsFilePath));
}
for (File localFile : localScratchPath.listFiles()) {
context.progress();
Path localFilePath = new Path("file://" + localFile.getPath());
if (!localFileSystem.exists(localFilePath)) {
throw new IOException(String.format("Cannot find local file [%s]!", localFilePath));
}
Path hdfsFilePath = new Path(indexPartHdfsFilePath, localFile.getName());
if (hdfsFileSystem.exists(hdfsFilePath)) {
throw new IOException(String.format("HDFS file [%s] already exists!", hdfsFilePath));
}
hdfsFileSystem.copyFromLocalFile(localFilePath, hdfsFilePath);
}
}
private void deleteLocalScratchPath() {
try {
FileUtils.deleteDirectory(localScratchPath);
} catch(IOException e) {
e.printStackTrace();
}
}
}
@swapnilpatil1682
Copy link

Hello , How can I get the "Lucene Indexing using mapreduce code" in single file ? How can I download it? Please reply ASAP.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment