public
Last active

Hadoop input format for swallowing entire files.

  • Download Gist
WholeFile.java
Java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
package forma;
 
import forma.WholeFileInputFormat;
import cascading.scheme.Scheme;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import java.io.IOException;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
 
public class WholeFile extends Scheme {
public WholeFile( Fields fields ) {
super(fields);
}
 
@Override
public void sourceInit( Tap tap, JobConf conf ) {
conf.setInputFormat( WholeFileInputFormat.class );
}
@Override
public void sinkInit(Tap tap, JobConf conf) throws IOException {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public Tuple source( Object key, Object value )
{
Tuple tuple = new Tuple();
tuple.add(key.toString());
tuple.add(value);
return tuple;
}
@Override
public void sink(TupleEntry te, OutputCollector oc) throws IOException {
throw new UnsupportedOperationException("Not supported yet.");
}
}
WholeFileInputFormat-NewAPI.java
Java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
package forma;
 
import java.io.IOException;
 
import forma.WholeFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.fs.Path;
 
public class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> {
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}
 
@Override
public RecordReader<NullWritable, BytesWritable> createRecordReader(
InputSplit split, TaskAttemptContext context) {
return new WholeFileRecordReader();
}
}
WholeFileInputFormat.java
Java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
package forma;
 
import java.io.IOException;
 
import forma.WholeFileRecordReader;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
 
public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable> {
@Override
protected boolean isSplitable(FileSystem fs, Path filename) {
return false;
}
 
@Override
public RecordReader<Text, BytesWritable> getRecordReader(
InputSplit split, JobConf job, Reporter reporter) throws IOException {
return new WholeFileRecordReader((FileSplit) split, job);
}
}
WholeFileRecordReader-NewAPI.java
Java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
package forma;
 
import java.io.IOException;
 
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.BytesWritable;
 
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.conf.Configuration;
 
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
 
class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {
private FileSplit fileSplit;
private Configuration conf;
private boolean processed = false;
private NullWritable key = NullWritable.get();
private BytesWritable value = new BytesWritable();
 
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
this.fileSplit = (FileSplit) inputSplit;
this.conf = taskAttemptContext.getConfiguration();
}
 
public boolean nextKeyValue() throws IOException {
if (!processed) {
byte[] contents = new byte[(int) fileSplit.getLength()];
 
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
 
FSDataInputStream in = null;
try {
in = fs.open(file);
IOUtils.readFully(in, contents, 0, contents.length);
value.set(contents, 0, contents.length);
} finally {
IOUtils.closeStream(in);
}
processed = true;
return true;
}
return false;
}
 
@Override
public NullWritable getCurrentKey() throws IOException, InterruptedException {
return key;
}
 
@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return processed ? 1.0f : 0.0f;
}
 
@Override
public void close() throws IOException {
// do nothing
}
}
WholeFileRecordReader.java
Java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
package forma;
 
import java.io.IOException;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.RecordReader;
 
class WholeFileRecordReader implements RecordReader<Text, BytesWritable> {
private FileSplit fileSplit;
private Configuration conf;
private boolean processed = false;
public WholeFileRecordReader(FileSplit fileSplit, Configuration conf) throws IOException {
this.fileSplit = fileSplit;
this.conf = conf;
}
 
@Override
public boolean next(Text key, BytesWritable value) throws IOException {
if (!processed) {
byte[] contents = new byte[(int) fileSplit.getLength()];
Path file = fileSplit.getPath();
 
String fileName = file.getName();
key.set(fileName);
 
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream in = null;
try {
in = fs.open(file);
IOUtils.readFully(in, contents, 0, contents.length);
value.set(contents, 0, contents.length);
} finally {
IOUtils.closeStream(in);
}
processed = true;
return true;
}
return false;
}
 
@Override
public Text createKey() {
return new Text();
}
 
@Override
public BytesWritable createValue() {
return new BytesWritable();
}
 
@Override
public long getPos() throws IOException {
return processed ? fileSplit.getLength() : 0;
}
 
@Override
public float getProgress() throws IOException {
return processed ? 1.0f : 0.0f;
}
 
@Override
public void close() throws IOException {
// do nothing
}
}

Hello sritie, and thank you for sharing your code, i try to use it with Hadoop 2.2.0 and unfortunately i get the following error :

org.apache.hadoop.mapred.TaskAttemptListenerImpl: Task: attempt_1396022683232_0016_m_000003_2 - exited : Found interface org.apache.hadoop.mapreduce.TaskAttemptContext, but class was expected

Do you have an idea where the error could be from ?

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.