Skip to content

Instantly share code, notes, and snippets.

@nowell-jana
Last active August 29, 2015 14:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nowell-jana/11265450 to your computer and use it in GitHub Desktop.
Save nowell-jana/11265450 to your computer and use it in GitHub Desktop.
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.lib.MultipleOutputs;
import org.apache.hadoop.mrunit.types.Pair;
public class MockMultipleTextOutputs extends MultipleOutputs {
private JobConf conf;
private Map<String, MockTextCollector> collectors = new HashMap<String, MockTextCollector>();
private Text nullTextReplacement;
public MockMultipleTextOutputs(JobConf conf) {
super(conf);
this.conf = conf;
nullTextReplacement = new Text("");
}
public MockMultipleTextOutputs(JobConf conf, Text nullTextReplacement) {
super(conf);
this.conf = conf;
this.nullTextReplacement = nullTextReplacement;
}
public static void addMultiNamedOutput(JobConf conf, String name) {
MultipleOutputs.addMultiNamedOutput(conf, name, TextOutputFormat.class, Text.class, Text.class);
}
public TreeMap<String, List<Pair<Text, Text>>> getOutput() {
TreeMap<String, List<Pair<Text, Text>>> result = new TreeMap<String, List<Pair<Text,Text>>>();
for (Map.Entry<String, MockTextCollector> entry : collectors.entrySet()) {
result.put(entry.getKey(), entry.getValue().getOutputs());
}
return result;
}
@SuppressWarnings("rawtypes")
@Override
public OutputCollector getCollector(String namedOutput, Reporter reporter) throws IOException {
return getCollector(namedOutput, null, reporter);
}
@SuppressWarnings({"rawtypes"})
@Override
public OutputCollector getCollector(String namedOutput, String multiName,
Reporter reporter) throws IOException {
boolean multi = isMultiNamedOutput(conf, namedOutput);
String baseFileName = (multi) ? namedOutput + "_" + multiName : namedOutput;
MockTextCollector existing_collector = collectors.get(baseFileName);
if (existing_collector != null) {
return existing_collector;
}
checkNamedOutputName(namedOutput);
boolean foundMatch = false;
Iterator<String> namedOutputsIter = getNamedOutputs();
while (namedOutputsIter.hasNext()) {
if (namedOutputsIter.next().equals(namedOutput)) {
foundMatch = true;
}
}
if (!foundMatch) {
throw new IllegalArgumentException("Undefined named output '" + namedOutput + "'");
}
if (!multi && multiName != null) {
throw new IllegalArgumentException(
"Name output '" + namedOutput + "' has not been defined as multi");
}
if (multi) {
checkTokenName(multiName);
}
MockTextCollector mockTextCollector = new MockTextCollector(conf, nullTextReplacement);
collectors.put(baseFileName, mockTextCollector);
return mockTextCollector;
}
private static void checkTokenName(String namedOutput) {
if (namedOutput == null || namedOutput.length() == 0) {
throw new IllegalArgumentException("Name cannot be NULL or emtpy");
}
for (char ch : namedOutput.toCharArray()) {
if ((ch >= 'A') && (ch <= 'Z')) {
continue;
}
if ((ch >= 'a') && (ch <= 'z')) {
continue;
}
if ((ch >= '0') && (ch <= '9')) {
continue;
}
throw new IllegalArgumentException("Name cannot be have a '" + ch + "' char");
}
}
private static void checkNamedOutputName(String namedOutput) {
checkTokenName(namedOutput);
/* Name cannot be the name used for the default output */
if (namedOutput.equals("part")) {
throw new IllegalArgumentException("Named output name cannot be 'part'");
}
}
public void clear() {
collectors = new HashMap<String, MockTextCollector>();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment