Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
SleepInputFormat to trick Hadoop into thinking there's input to process, when really there's not (useful in little test jobs)
/**
* This is a lot like the InputFormat defined in Hadoop's
* <a href="https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/SleepJob.java">SleepJob</a>
* except that it uses the Hadoop 2 API.
*/
public static class SleepInputFormat extends InputFormat<IntWritable, IntWritable> {
public static final String MAP_SLEEP_COUNT = "mapreduce.sleepjob.map.sleep.count";
public static final String REDUCE_SLEEP_COUNT = "mapreduce.sleepjob.reduce.sleep.count";
@Override
public List<InputSplit> getSplits(JobContext jobContext) {
List<InputSplit> ret = new ArrayList<InputSplit>();
int numSplits = jobContext.getConfiguration().getInt("mapreduce.job.maps", 1);
for (int i = 0; i < numSplits; ++i) {
ret.add(new EmptySplit());
}
return ret;
}
@Override
public RecordReader<IntWritable, IntWritable> createRecordReader(InputSplit ignored, TaskAttemptContext taskContext) throws IOException {
Configuration conf = taskContext.getConfiguration();
final int count = conf.getInt(MAP_SLEEP_COUNT, 1);
if (count < 0) {
throw new IOException("Invalid map count: " + count);
}
final int redCount = conf.getInt(REDUCE_SLEEP_COUNT, 1);
if (redCount < 0) {
throw new IOException("Invalid reduce count: " + redCount);
}
final int emitPerMapTask = (redCount * taskContext.getNumReduceTasks());
return new RecordReader<IntWritable, IntWritable>() {
private int records = 0;
private int emitCount = 0;
private IntWritable key = null;
private IntWritable value = null;
@Override
public void initialize(InputSplit split, TaskAttemptContext context) { }
@Override
public boolean nextKeyValue()
throws IOException {
if (count == 0) {
return false;
}
key = new IntWritable();
key.set(emitCount);
int emit = emitPerMapTask / count;
if ((emitPerMapTask) % count > records) {
++emit;
}
emitCount += emit;
value = new IntWritable();
value.set(emit);
return records++ < count;
}
@Override
public IntWritable getCurrentKey() { return key; }
@Override
public IntWritable getCurrentValue() { return value; }
@Override
public void close() throws IOException { }
@Override
public float getProgress() throws IOException {
return count == 0 ? 100 : records / ((float)count);
}
};
}
public static class EmptySplit extends InputSplit implements Writable {
@Override
public void write(DataOutput out) throws IOException { }
@Override
public void readFields(DataInput in) throws IOException { }
@Override
public long getLength() {
return 0L;
}
@Override
public String[] getLocations() {
return new String[0];
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment