Skip to content

Instantly share code, notes, and snippets.

@keeganwitt
Last active December 20, 2015 02:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save keeganwitt/6053872 to your computer and use it in GitHub Desktop.
Save keeganwitt/6053872 to your computer and use it in GitHub Desktop.
SleepInputFormat to trick Hadoop into thinking there's input to process, when really there's not (useful in little test jobs)
/**
* This is a lot like the InputFormat defined in Hadoop's
* <a href="https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/SleepJob.java">SleepJob</a>
* except that it uses the Hadoop 2 API.
*/
public static class SleepInputFormat extends InputFormat<IntWritable, IntWritable> {
public static final String MAP_SLEEP_COUNT = "mapreduce.sleepjob.map.sleep.count";
public static final String REDUCE_SLEEP_COUNT = "mapreduce.sleepjob.reduce.sleep.count";
@Override
public List<InputSplit> getSplits(JobContext jobContext) {
List<InputSplit> ret = new ArrayList<InputSplit>();
int numSplits = jobContext.getConfiguration().getInt("mapreduce.job.maps", 1);
for (int i = 0; i < numSplits; ++i) {
ret.add(new EmptySplit());
}
return ret;
}
@Override
public RecordReader<IntWritable, IntWritable> createRecordReader(InputSplit ignored, TaskAttemptContext taskContext) throws IOException {
Configuration conf = taskContext.getConfiguration();
final int count = conf.getInt(MAP_SLEEP_COUNT, 1);
if (count < 0) {
throw new IOException("Invalid map count: " + count);
}
final int redCount = conf.getInt(REDUCE_SLEEP_COUNT, 1);
if (redCount < 0) {
throw new IOException("Invalid reduce count: " + redCount);
}
final int emitPerMapTask = (redCount * taskContext.getNumReduceTasks());
return new RecordReader<IntWritable, IntWritable>() {
private int records = 0;
private int emitCount = 0;
private IntWritable key = null;
private IntWritable value = null;
@Override
public void initialize(InputSplit split, TaskAttemptContext context) { }
@Override
public boolean nextKeyValue()
throws IOException {
if (count == 0) {
return false;
}
key = new IntWritable();
key.set(emitCount);
int emit = emitPerMapTask / count;
if ((emitPerMapTask) % count > records) {
++emit;
}
emitCount += emit;
value = new IntWritable();
value.set(emit);
return records++ < count;
}
@Override
public IntWritable getCurrentKey() { return key; }
@Override
public IntWritable getCurrentValue() { return value; }
@Override
public void close() throws IOException { }
@Override
public float getProgress() throws IOException {
return count == 0 ? 100 : records / ((float)count);
}
};
}
public static class EmptySplit extends InputSplit implements Writable {
@Override
public void write(DataOutput out) throws IOException { }
@Override
public void readFields(DataInput in) throws IOException { }
@Override
public long getLength() {
return 0L;
}
@Override
public String[] getLocations() {
return new String[0];
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment