Skip to content

Instantly share code, notes, and snippets.

@chetan
Created February 1, 2012 16:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save chetan/1717820 to your computer and use it in GitHub Desktop.
Save chetan/1717820 to your computer and use it in GitHub Desktop.
Simple subclass of MRUnit's MapReduceDriver to fix NPE when using MultipleOutputs
package org.apache.hadoop.mrunit.mapreduce;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
import org.apache.hadoop.mrunit.types.Pair;
@SuppressWarnings("rawtypes")
public class MultiOutputMapReduceDriver<K1, V1, K2 extends Comparable, V2, K3, V3> extends
MapReduceDriver<K1, V1, K2, V2, K3, V3> {
public MultiOutputMapReduceDriver() {
super();
}
public MultiOutputMapReduceDriver(Mapper<K1, V1, K2, V2> m, Reducer<K2, V2, K3, V3> r) {
super(m, r);
}
@Override
public List<Pair<K3, V3>> run() throws IOException {
List<Pair<K2, V2>> mapOutputs = new ArrayList<Pair<K2, V2>>();
// run map component
for (Pair<K1, V1> input : inputList) {
LOG.debug("Mapping input " + input.toString() + ")");
mapOutputs.addAll(new MapDriver<K1, V1, K2, V2>(getMapper()).withInput(
input).withCounters(getCounters()).withConfiguration(configuration).run());
}
List<Pair<K2, List<V2>>> reduceInputs = shuffle(mapOutputs);
List<Pair<K3, V3>> reduceOutputs = new ArrayList<Pair<K3, V3>>();
for (Pair<K2, List<V2>> input : reduceInputs) {
K2 inputKey = input.getFirst();
List<V2> inputValues = input.getSecond();
StringBuilder sb = new StringBuilder();
formatValueList(inputValues, sb);
LOG.debug("Reducing input (" + inputKey.toString() + ", "
+ sb.toString() + ")");
reduceOutputs.addAll(new MultiOutputReduceDriver<K2, V2, K3, V3>(getReducer())
.withCounters(getCounters()).withConfiguration(configuration)
.withInputKey(inputKey).withInputValues(inputValues).run());
}
return reduceOutputs;
}
}
package org.apache.hadoop.mrunit.mapreduce;
import static org.mockito.Mockito.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.apache.hadoop.mrunit.mapreduce.mock.MockReduceContextWrapper;
import org.apache.hadoop.mrunit.types.Pair;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class MultiOutputReduceDriver<K1, V1, K2, V2> extends ReduceDriver<K1, V1, K2, V2> {
public MultiOutputReduceDriver() {
super();
}
public MultiOutputReduceDriver(Reducer<K1, V1, K2, V2> r) {
super(r);
}
@Override
public List<Pair<K2, V2>> run() throws IOException {
List<Pair<K1, List<V1>>> inputs = new ArrayList<Pair<K1, List<V1>>>();
inputs.add(new Pair<K1, List<V1>>(inputKey, getInputValues()));
try {
MockReduceContextWrapper<K1, V1, K2, V2> wrapper =
new MockReduceContextWrapper<K1, V1, K2, V2>(inputs, getCounters(), getConfiguration());
Reducer<K1, V1, K2, V2>.Context context = wrapper.getMockContext();
when(context.getTaskAttemptID()).thenAnswer(new Answer<TaskAttemptID>() {
@Override
public TaskAttemptID answer(InvocationOnMock invocation) throws Throwable {
return new TaskAttemptID();
}
});
getReducer().run(context);
return wrapper.getOutputs();
} catch (InterruptedException ie) {
throw new IOException(ie);
}
}
}
@baotong
Copy link

baotong commented Mar 5, 2013

Can you tell me how to use these two files ?

@bitti
Copy link

bitti commented May 24, 2016

Don't use them, upgrade to mrunit 1.1.0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment