Skip to content

Instantly share code, notes, and snippets.

@colinmarc
Created October 5, 2015 22:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save colinmarc/6a24aa2024417fa02a3f to your computer and use it in GitHub Desktop.
Save colinmarc/6a24aa2024417fa02a3f to your computer and use it in GitHub Desktop.
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.OutputCommitter;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.TaskAttemptContext;
import org.apache.hadoop.mapred.JobConf;
import org.apache.parquet.hadoop.ParquetOutputCommitter;
// Hadoop, by default, does a complex thing where it writes the output of tasks
// to /_temporary/task_blahblahblah, and then, once the job is done, renames
// them into place in the output path.
// This works great with filesystems like HDFS which support renames. But it
// works really poorly with S3, because S3 doesn't have that functionality;
// one can copy keys, but not rename them. The hadoop FileSystem API conveniently
// papers over this, since 'rename' is part of the interface, and the various
// S3 implementations do a copy when asked to rename.
// Creating a class that overrides OutputCommitter (and setting it with
// setOutputCommitter) allows us to turn off this functionality. In this
// version, we also make sure to write the parquet summary files.
public class DirectParquetOutputCommitter extends OutputCommitter {
@Override
public void setupJob(JobContext context) throws IOException {
JobConf conf = context.getJobConf();
Path outputPath = FileOutputFormat.getOutputPath(conf);
if (outputPath != null) {
Path tmpDir = outputPath.getParent();
FileSystem fs = outputPath.getFileSystem(conf);
if (!fs.mkdirs(outputPath.getParent())) {
throw new IOException("Failed to create " + tmpDir.toString());
}
}
}
@Override
public void commitJob(JobContext context) throws IOException {
JobConf conf = context.getJobConf();
Path outputPath = FileOutputFormat.getOutputPath(conf);
ParquetOutputCommitter.writeMetaDataFile(conf, outputPath);
FileSystem fs = outputPath.getFileSystem(conf);
fs.create(new Path(outputPath, "_SUCCESS")).close();
}
@Override
public void cleanupJob(JobContext context) throws IOException {
}
@Override
public void setupTask(TaskAttemptContext context) throws IOException {
}
@Override
public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
return false;
}
@Override
public void commitTask(TaskAttemptContext context) throws IOException {
}
@Override
public void abortTask(TaskAttemptContext context) throws IOException {
}
}
import cascading.flow.FlowProcess;
import cascading.tap.Tap;
import com.twitter.scrooge.ThriftStruct;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.parquet.cascading.ParquetValueScheme;
import org.apache.parquet.filter2.predicate.FilterPredicate;
import org.apache.parquet.hadoop.ParquetInputFormat;
import org.apache.parquet.hadoop.ParquetOutputFormat;
import org.apache.parquet.scrooge.ParquetScroogeScheme;
import org.apache.parquet.cascading.ParquetValueScheme.Config;
public class DirectParquetScroogeScheme<T extends ThriftStruct> extends ParquetScroogeScheme<T> {
public DirectParquetScroogeScheme(Class<T> klass) {
super(klass);
}
public DirectParquetScroogeScheme(FilterPredicate filterPredicate, Class<T> klass) {
super(filterPredicate, klass);
}
public DirectParquetScroogeScheme(Config<T> config) {
super(config);
}
@Override
public void sinkConfInit(FlowProcess<JobConf> fp, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf jobConf) {
super.sinkConfInit(fp, tap, jobConf);
jobConf.setOutputCommitter(DirectParquetOutputCommitter.class);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment