Skip to content

Instantly share code, notes, and snippets.

@viacoban
Created February 13, 2013 15:21
Show Gist options
  • Save viacoban/4945325 to your computer and use it in GitHub Desktop.
Save viacoban/4945325 to your computer and use it in GitHub Desktop.
multi-input-crunch-source-impl
public class MultiSequenceFileInputFormat<K, V> extends CombineFileInputFormat<K, V> {
@SuppressWarnings("unchecked")
@Override
public RecordReader<K, V> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException {
return new CombineFileRecordReader(
(CombineFileSplit) split, context, MultiSequenceFileRecordReader.class
);
}
}
public final class MultiSequenceFileRecordReader<K, V> extends RecordReader<K, V> {
private CombineFileSplit split;
private TaskAttemptContext context;
private int index;
private RecordReader<K, V> delegate;
public MultiSequenceFileRecordReader(CombineFileSplit split, TaskAttemptContext context,
Integer index) throws IOException {
this.split = split;
this.context = context;
this.index = index;
delegate = new SequenceFileRecordReader<K, V>();
}
public void initialize(InputSplit genericSplit, TaskAttemptContext context)
throws IOException, InterruptedException {
this.split = (CombineFileSplit) genericSplit;
this.context = context;
if (null == delegate) {
delegate = new SequenceFileRecordReader<K, V>();
}
FileSplit fileSplit = new FileSplit(
this.split.getPath(index),
this.split.getOffset(index),
this.split.getLength(index),
this.split.getLocations()
);
this.delegate.initialize(fileSplit, this.context);
}
public boolean nextKeyValue() throws IOException, InterruptedException {
return delegate.nextKeyValue();
}
@Override
public K getCurrentKey() throws IOException, InterruptedException {
return delegate.getCurrentKey();
}
@Override
public V getCurrentValue() throws IOException, InterruptedException {
return delegate.getCurrentValue();
}
@Override
public float getProgress() throws IOException, InterruptedException {
return delegate.getProgress();
}
public synchronized void close() throws IOException {
if (delegate != null) {
delegate.close();
delegate = null;
}
}
}
public class TrainingRecordSource implements TableSource<ShortWritable, BytesWritable> {
@Override
public PTableType<ShortWritable, BytesWritable> getTableType() {
return Writables.tableOf(
Writables.writables(ShortWritable.class), Writables.writables(BytesWritable.class)
);
}
@Override
public PType<Pair<ShortWritable, BytesWritable>> getType() {
return getTableType();
}
@Override
public void configureSource(Job job, int i) throws IOException {
job.setInputFormatClass(MultiSequenceFileInputFormat.class);
//traverse fs and add paths as inputs ...
for (FileStatus input : inputs) {
MultiSequenceFileInputFormat.addInputPath(job, input.getPath());
}
}
@Override
public long getSize(Configuration configuration) {
return Integer.MAX_VALUE;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment