Skip to content

@sritchie /WholeFile.java
Created

Embed URL

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Hadoop input format for swallowing entire files.
package forma;
import forma.WholeFileInputFormat;
import cascading.scheme.Scheme;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import java.io.IOException;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
public class WholeFile extends Scheme {
public WholeFile( Fields fields ) {
super(fields);
}
@Override
public void sourceInit( Tap tap, JobConf conf ) {
conf.setInputFormat( WholeFileInputFormat.class );
}
@Override
public void sinkInit(Tap tap, JobConf conf) throws IOException {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public Tuple source( Object key, Object value )
{
Tuple tuple = new Tuple();
tuple.add(key.toString());
tuple.add(value);
return tuple;
}
@Override
public void sink(TupleEntry te, OutputCollector oc) throws IOException {
throw new UnsupportedOperationException("Not supported yet.");
}
}
package forma;
import java.io.IOException;
import forma.WholeFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.fs.Path;
public class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> {
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}
@Override
public RecordReader<NullWritable, BytesWritable> createRecordReader(
InputSplit split, TaskAttemptContext context) {
return new WholeFileRecordReader();
}
}
package forma;
import java.io.IOException;
import forma.WholeFileRecordReader;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable> {
@Override
protected boolean isSplitable(FileSystem fs, Path filename) {
return false;
}
@Override
public RecordReader<Text, BytesWritable> getRecordReader(
InputSplit split, JobConf job, Reporter reporter) throws IOException {
return new WholeFileRecordReader((FileSplit) split, job);
}
}
package forma;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {
private FileSplit fileSplit;
private Configuration conf;
private boolean processed = false;
private NullWritable key = NullWritable.get();
private BytesWritable value = new BytesWritable();
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
this.fileSplit = (FileSplit) inputSplit;
this.conf = taskAttemptContext.getConfiguration();
}
public boolean nextKeyValue() throws IOException {
if (!processed) {
byte[] contents = new byte[(int) fileSplit.getLength()];
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream in = null;
try {
in = fs.open(file);
IOUtils.readFully(in, contents, 0, contents.length);
value.set(contents, 0, contents.length);
} finally {
IOUtils.closeStream(in);
}
processed = true;
return true;
}
return false;
}
@Override
public NullWritable getCurrentKey() throws IOException, InterruptedException {
return key;
}
@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return processed ? 1.0f : 0.0f;
}
@Override
public void close() throws IOException {
// do nothing
}
}
package forma;
import java.io.IOException;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.RecordReader;
class WholeFileRecordReader implements RecordReader<Text, BytesWritable> {
private FileSplit fileSplit;
private Configuration conf;
private boolean processed = false;
public WholeFileRecordReader(FileSplit fileSplit, Configuration conf) throws IOException {
this.fileSplit = fileSplit;
this.conf = conf;
}
@Override
public boolean next(Text key, BytesWritable value) throws IOException {
if (!processed) {
byte[] contents = new byte[(int) fileSplit.getLength()];
Path file = fileSplit.getPath();
String fileName = file.getName();
key.set(fileName);
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream in = null;
try {
in = fs.open(file);
IOUtils.readFully(in, contents, 0, contents.length);
value.set(contents, 0, contents.length);
} finally {
IOUtils.closeStream(in);
}
processed = true;
return true;
}
return false;
}
@Override
public Text createKey() {
return new Text();
}
@Override
public BytesWritable createValue() {
return new BytesWritable();
}
@Override
public long getPos() throws IOException {
return processed ? fileSplit.getLength() : 0;
}
@Override
public float getProgress() throws IOException {
return processed ? 1.0f : 0.0f;
}
@Override
public void close() throws IOException {
// do nothing
}
}
@strokyl

Hello sritie, and thank you for sharing your code, i try to use it with Hadoop 2.2.0 and unfortunately i get the following error :

org.apache.hadoop.mapred.TaskAttemptListenerImpl: Task: attempt_1396022683232_0016_m_000003_2 - exited : Found interface org.apache.hadoop.mapreduce.TaskAttemptContext, but class was expected

Do you have an idea where the error could be from ?

@qsLI

org.apache.hadoop.mapred is the old interface , org.apache.hadoop.mapreduce is the new interface they are mismatch.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.