Skip to content

Instantly share code, notes, and snippets.

@kennycason
Created January 3, 2019 21:58
Show Gist options
  • Save kennycason/7bdb418eb54a824721180811c292ddfd to your computer and use it in GitHub Desktop.
Save kennycason/7bdb418eb54a824721180811c292ddfd to your computer and use it in GitHub Desktop.
Hadoop IO - BinaryFileInputFormat
package com.kennycason.hadoop.io;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
/** An {@link InputFormat} for reading binary data (byte[]) */
public class BinaryFileInputFormat extends FileInputFormat<NullWritable, ImmutableBytesWritable> {
@Override
public RecordReader<NullWritable, ImmutableBytesWritable> createRecordReader(final InputSplit split, final TaskAttemptContext context) {
return new BinaryFileRecordReader();
}
@Override
protected boolean isSplitable(final JobContext context, final Path file) {
return false;
}
}
public class BinaryFileMapReduceJob {
private static final long INPUT_MAX_SPLIT_SIZE = (long) 1024 * 1024 * 1024;
/** initialize m/r job code emitted **/
private void setupInputParameters(final Job jobInstance) throws IOException {
jobInstance.setInputFormatClass(CombineBinaryFileInputFormat.class);
FileInputFormat.addInputPath(jobInstance, new Path("/user/kenny/binary_files/");
jobInstance.getConfiguration().setLong("mapreduce.input.fileinputformat.split.maxsize", INPUT_MAX_SPLIT_SIZE);
jobInstance.setMapperClass(BinaryFileMapReduceMapper.class);
}
}
public class BinaryFileMapReduceMapper extends Mapper<Writable, ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable> {
@Override
protected void setup(final Context context) throws IOException, InterruptedException {}
@Override
protected void map(final Writable ignoredKey,
final ImmutableBytesWritable value,
final Context context) throws IOException, InterruptedException {
// do stuff
}
}
package com.kennycason.hadoop.io;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
public class BinaryFileRecordReader extends RecordReader<NullWritable, ImmutableBytesWritable> {
private FSDataInputStream fsDataInputStream;
private byte[] fileBytes;
private int fileBytesLength;
private boolean finishedReadingBytes;
@Override
public void initialize(final InputSplit inputSplit, final TaskAttemptContext context) throws IOException {
if (!(inputSplit instanceof FileSplit)) {
throw new IllegalArgumentException("Only FileSplits are allowed, found: " + inputSplit.getClass());
}
final FileSplit fileSplit = (FileSplit) inputSplit;
fsDataInputStream = fileSplit.getPath()
.getFileSystem(context.getConfiguration())
.open(fileSplit.getPath());
fileBytesLength = (int) fileSplit.getLength();
fileBytes = new byte[fileBytesLength];
}
@Override
public boolean nextKeyValue() throws IOException {
if (finishedReadingBytes) {
return false;
}
fsDataInputStream.readFully(fileBytes);
finishedReadingBytes = true;
return true;
}
@Override
public NullWritable getCurrentKey() {
return NullWritable.get();
}
@Override
public ImmutableBytesWritable getCurrentValue() {
return new ImmutableBytesWritable(fileBytes);
}
@Override
public float getProgress() {
return finishedReadingBytes ? 1.0f : 0.0f;
}
@Override
public void close() throws IOException {
fsDataInputStream.close();
}
}
package com.kennycason.hadoop.io;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.*;
import java.io.IOException;
/**
* Input format that is a <code>CombineFileInputFormat</code>-equivalent for
* <code>BinaryFileInputFormat</code>.
*
* @see CombineFileInputFormat
*/
public class CombineBinaryFileInputFormat extends CombineFileInputFormat<NullWritable, ImmutableBytesWritable> {
@Override
public RecordReader<NullWritable, ImmutableBytesWritable> createRecordReader(final InputSplit split,
final TaskAttemptContext context) throws IOException {
return new CombineFileRecordReader((CombineFileSplit)split, context, BinaryFileRecordReaderWrapper.class);
}
/**
* A record reader that may be passed to <code>CombineFileRecordReader</code>
* so that it can be used in a <code>CombineFileInputFormat</code>-equivalent
* for <code>BinaryFileInputFormat</code>.
*
* @see CombineFileRecordReader
* @see CombineFileInputFormat
* @see BinaryFileInputFormat
*/
private static class BinaryFileRecordReaderWrapper extends CombineFileRecordReaderWrapper<NullWritable, ImmutableBytesWritable> {
public BinaryFileRecordReaderWrapper (final CombineFileSplit split,
final TaskAttemptContext context,
final Integer idx) throws IOException, InterruptedException {
super(new BinaryFileInputFormat(), split, context, idx);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment