Created
October 5, 2015 22:07
-
-
Save colinmarc/6a24aa2024417fa02a3f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.IOException; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.mapred.FileOutputFormat; | |
import org.apache.hadoop.mapred.OutputCommitter; | |
import org.apache.hadoop.mapred.JobContext; | |
import org.apache.hadoop.mapred.TaskAttemptContext; | |
import org.apache.hadoop.mapred.JobConf; | |
import org.apache.parquet.hadoop.ParquetOutputCommitter; | |
// Hadoop, by default, does a complex thing where it writes the output of tasks | |
// to /_temporary/task_blahblahblah, and then, once the job is done, renames | |
// them into place in the output path. | |
// This works great with filesystems like HDFS which support renames. But it | |
// works really poorly with S3, because S3 doesn't have that functionality; | |
// one can copy keys, but not rename them. The hadoop FileSystem API conveniently | |
// papers over this, since 'rename' is part of the interface, and the various | |
// S3 implementations do a copy when asked to rename. | |
// Creating a class that overrides OutputCommitter (and setting it with | |
// setOutputCommitter) allows us to turn off this functionality. In this | |
// version, we also make sure to write the parquet summary files. | |
public class DirectParquetOutputCommitter extends OutputCommitter { | |
@Override | |
public void setupJob(JobContext context) throws IOException { | |
JobConf conf = context.getJobConf(); | |
Path outputPath = FileOutputFormat.getOutputPath(conf); | |
if (outputPath != null) { | |
Path tmpDir = outputPath.getParent(); | |
FileSystem fs = outputPath.getFileSystem(conf); | |
if (!fs.mkdirs(outputPath.getParent())) { | |
throw new IOException("Failed to create " + tmpDir.toString()); | |
} | |
} | |
} | |
@Override | |
public void commitJob(JobContext context) throws IOException { | |
JobConf conf = context.getJobConf(); | |
Path outputPath = FileOutputFormat.getOutputPath(conf); | |
ParquetOutputCommitter.writeMetaDataFile(conf, outputPath); | |
FileSystem fs = outputPath.getFileSystem(conf); | |
fs.create(new Path(outputPath, "_SUCCESS")).close(); | |
} | |
@Override | |
public void cleanupJob(JobContext context) throws IOException { | |
} | |
@Override | |
public void setupTask(TaskAttemptContext context) throws IOException { | |
} | |
@Override | |
public boolean needsTaskCommit(TaskAttemptContext context) throws IOException { | |
return false; | |
} | |
@Override | |
public void commitTask(TaskAttemptContext context) throws IOException { | |
} | |
@Override | |
public void abortTask(TaskAttemptContext context) throws IOException { | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cascading.flow.FlowProcess; | |
import cascading.tap.Tap; | |
import com.twitter.scrooge.ThriftStruct; | |
import org.apache.hadoop.mapred.JobConf; | |
import org.apache.hadoop.mapred.OutputCollector; | |
import org.apache.hadoop.mapred.RecordReader; | |
import org.apache.parquet.cascading.ParquetValueScheme; | |
import org.apache.parquet.filter2.predicate.FilterPredicate; | |
import org.apache.parquet.hadoop.ParquetInputFormat; | |
import org.apache.parquet.hadoop.ParquetOutputFormat; | |
import org.apache.parquet.scrooge.ParquetScroogeScheme; | |
import org.apache.parquet.cascading.ParquetValueScheme.Config; | |
public class DirectParquetScroogeScheme<T extends ThriftStruct> extends ParquetScroogeScheme<T> { | |
public DirectParquetScroogeScheme(Class<T> klass) { | |
super(klass); | |
} | |
public DirectParquetScroogeScheme(FilterPredicate filterPredicate, Class<T> klass) { | |
super(filterPredicate, klass); | |
} | |
public DirectParquetScroogeScheme(Config<T> config) { | |
super(config); | |
} | |
@Override | |
public void sinkConfInit(FlowProcess<JobConf> fp, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf jobConf) { | |
super.sinkConfInit(fp, tap, jobConf); | |
jobConf.setOutputCommitter(DirectParquetOutputCommitter.class); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment