Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save luyifan/96cec8f9afeef0f3aee5 to your computer and use it in GitHub Desktop.
Save luyifan/96cec8f9afeef0f3aee5 to your computer and use it in GitHub Desktop.
package org.apache.hadoop.mrunit.mapreduce;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
import org.apache.hadoop.mrunit.types.Pair;
@SuppressWarnings("rawtypes")
public class MultiOutputMapReduceDriver<K1, V1, K2 extends Comparable, V2, K3, V3> extends
MapReduceDriver<K1, V1, K2, V2, K3, V3> {
public MultiOutputMapReduceDriver() {
super();
}
public MultiOutputMapReduceDriver(Mapper<K1, V1, K2, V2> m, Reducer<K2, V2, K3, V3> r) {
super(m, r);
}
@Override
public List<Pair<K3, V3>> run() throws IOException {
List<Pair<K2, V2>> mapOutputs = new ArrayList<Pair<K2, V2>>();
// run map component
for (Pair<K1, V1> input : inputList) {
LOG.debug("Mapping input " + input.toString() + ")");
mapOutputs.addAll(new MapDriver<K1, V1, K2, V2>(getMapper()).withInput(
input).withCounters(getCounters()).withConfiguration(configuration).run());
}
List<Pair<K2, List<V2>>> reduceInputs = shuffle(mapOutputs);
List<Pair<K3, V3>> reduceOutputs = new ArrayList<Pair<K3, V3>>();
for (Pair<K2, List<V2>> input : reduceInputs) {
K2 inputKey = input.getFirst();
List<V2> inputValues = input.getSecond();
StringBuilder sb = new StringBuilder();
formatValueList(inputValues, sb);
LOG.debug("Reducing input (" + inputKey.toString() + ", "
+ sb.toString() + ")");
reduceOutputs.addAll(new MultiOutputReduceDriver<K2, V2, K3, V3>(getReducer())
.withCounters(getCounters()).withConfiguration(configuration)
.withInputKey(inputKey).withInputValues(inputValues).run());
}
return reduceOutputs;
}
}
package org.apache.hadoop.mrunit.mapreduce;
import static org.mockito.Mockito.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.apache.hadoop.mrunit.mapreduce.mock.MockReduceContextWrapper;
import org.apache.hadoop.mrunit.types.Pair;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class MultiOutputReduceDriver<K1, V1, K2, V2> extends ReduceDriver<K1, V1, K2, V2> {
public MultiOutputReduceDriver() {
super();
}
public MultiOutputReduceDriver(Reducer<K1, V1, K2, V2> r) {
super(r);
}
@Override
public List<Pair<K2, V2>> run() throws IOException {
List<Pair<K1, List<V1>>> inputs = new ArrayList<Pair<K1, List<V1>>>();
inputs.add(new Pair<K1, List<V1>>(inputKey, getInputValues()));
try {
MockReduceContextWrapper<K1, V1, K2, V2> wrapper =
new MockReduceContextWrapper<K1, V1, K2, V2>(inputs, getCounters(), getConfiguration());
Reducer<K1, V1, K2, V2>.Context context = wrapper.getMockContext();
when(context.getTaskAttemptID()).thenAnswer(new Answer<TaskAttemptID>() {
@Override
public TaskAttemptID answer(InvocationOnMock invocation) throws Throwable {
return new TaskAttemptID();
}
});
getReducer().run(context);
return wrapper.getOutputs();
} catch (InterruptedException ie) {
throw new IOException(ie);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment