Skip to content

Instantly share code, notes, and snippets.

@jmason
Created April 13, 2018 16:33
Show Gist options
  • Save jmason/015fbd242ec241d49fef76606a26058a to your computer and use it in GitHub Desktop.
Save jmason/015fbd242ec241d49fef76606a26058a to your computer and use it in GitHub Desktop.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.orc.CompressionCodec;
import org.apache.orc.OrcFile;
import org.apache.orc.OrcProto;
import org.apache.orc.Writer;
import org.apache.orc.impl.PhysicalFsWriter;
import org.apache.orc.impl.StreamName;
import org.apache.orc.mapreduce.OrcMapreduceRecordWriter;
import org.apache.orc.mapreduce.OrcOutputFormat;
import java.io.IOException;
/**
* A hacky override of OrcOutputFormat which won't generate Presto-incompatible
* UTF-8 Bloom Filter structures which Presto doesn't yet support.
* See https://github.com/prestodb/presto/issues/7120 .
*
* Use in place of OrcOutputFormat in your Hadoop mapper or reducer, and you
* can then call the usual OrcOutputFormat setter methods to set compressor
* classes, output path, etc. as normal.
*/
public class PrestoCompatibleOrcOutputFormat<V extends Writable> extends OrcOutputFormat<V> {
private static final String EXTENSION = ".orc";
@Override
public RecordWriter<NullWritable, V> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException {
Configuration conf = taskAttemptContext.getConfiguration();
Path filename = getDefaultWorkFile(taskAttemptContext, EXTENSION);
OrcFile.WriterOptions opts = org.apache.orc.mapred.OrcOutputFormat.buildOptions(conf);
FileSystem fs = filename.getFileSystem(opts.getConfiguration());
opts.physicalWriter(new PrestoCompatiblePhysicalWriter(fs, filename, opts));
Writer writer = OrcFile.createWriter(filename, opts);
return new OrcMapreduceRecordWriter<V>(writer);
}
public static class PrestoCompatiblePhysicalWriter extends PhysicalFsWriter {
PrestoCompatiblePhysicalWriter(FileSystem fs, Path filename, OrcFile.WriterOptions opts) throws IOException {
super(fs, filename, opts);
}
@Override
public void writeBloomFilter(StreamName name,
OrcProto.BloomFilterIndex.Builder bloom,
CompressionCodec codec) throws IOException {
if (!name.getKind().equals(OrcProto.Stream.Kind.BLOOM_FILTER_UTF8)) {
super.writeBloomFilter(name, bloom, codec);
}
// else inhibit writing this stream entry entirely
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment