Last active
August 29, 2015 14:00
-
-
Save nowell-jana/11265450 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.IOException; | |
import java.util.HashMap; | |
import java.util.Iterator; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.TreeMap; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapred.JobConf; | |
import org.apache.hadoop.mapred.OutputCollector; | |
import org.apache.hadoop.mapred.Reporter; | |
import org.apache.hadoop.mapred.TextOutputFormat; | |
import org.apache.hadoop.mapred.lib.MultipleOutputs; | |
import org.apache.hadoop.mrunit.types.Pair; | |
public class MockMultipleTextOutputs extends MultipleOutputs { | |
private JobConf conf; | |
private Map<String, MockTextCollector> collectors = new HashMap<String, MockTextCollector>(); | |
private Text nullTextReplacement; | |
public MockMultipleTextOutputs(JobConf conf) { | |
super(conf); | |
this.conf = conf; | |
nullTextReplacement = new Text(""); | |
} | |
public MockMultipleTextOutputs(JobConf conf, Text nullTextReplacement) { | |
super(conf); | |
this.conf = conf; | |
this.nullTextReplacement = nullTextReplacement; | |
} | |
public static void addMultiNamedOutput(JobConf conf, String name) { | |
MultipleOutputs.addMultiNamedOutput(conf, name, TextOutputFormat.class, Text.class, Text.class); | |
} | |
public TreeMap<String, List<Pair<Text, Text>>> getOutput() { | |
TreeMap<String, List<Pair<Text, Text>>> result = new TreeMap<String, List<Pair<Text,Text>>>(); | |
for (Map.Entry<String, MockTextCollector> entry : collectors.entrySet()) { | |
result.put(entry.getKey(), entry.getValue().getOutputs()); | |
} | |
return result; | |
} | |
@SuppressWarnings("rawtypes") | |
@Override | |
public OutputCollector getCollector(String namedOutput, Reporter reporter) throws IOException { | |
return getCollector(namedOutput, null, reporter); | |
} | |
@SuppressWarnings({"rawtypes"}) | |
@Override | |
public OutputCollector getCollector(String namedOutput, String multiName, | |
Reporter reporter) throws IOException { | |
boolean multi = isMultiNamedOutput(conf, namedOutput); | |
String baseFileName = (multi) ? namedOutput + "_" + multiName : namedOutput; | |
MockTextCollector existing_collector = collectors.get(baseFileName); | |
if (existing_collector != null) { | |
return existing_collector; | |
} | |
checkNamedOutputName(namedOutput); | |
boolean foundMatch = false; | |
Iterator<String> namedOutputsIter = getNamedOutputs(); | |
while (namedOutputsIter.hasNext()) { | |
if (namedOutputsIter.next().equals(namedOutput)) { | |
foundMatch = true; | |
} | |
} | |
if (!foundMatch) { | |
throw new IllegalArgumentException("Undefined named output '" + namedOutput + "'"); | |
} | |
if (!multi && multiName != null) { | |
throw new IllegalArgumentException( | |
"Name output '" + namedOutput + "' has not been defined as multi"); | |
} | |
if (multi) { | |
checkTokenName(multiName); | |
} | |
MockTextCollector mockTextCollector = new MockTextCollector(conf, nullTextReplacement); | |
collectors.put(baseFileName, mockTextCollector); | |
return mockTextCollector; | |
} | |
private static void checkTokenName(String namedOutput) { | |
if (namedOutput == null || namedOutput.length() == 0) { | |
throw new IllegalArgumentException("Name cannot be NULL or emtpy"); | |
} | |
for (char ch : namedOutput.toCharArray()) { | |
if ((ch >= 'A') && (ch <= 'Z')) { | |
continue; | |
} | |
if ((ch >= 'a') && (ch <= 'z')) { | |
continue; | |
} | |
if ((ch >= '0') && (ch <= '9')) { | |
continue; | |
} | |
throw new IllegalArgumentException("Name cannot be have a '" + ch + "' char"); | |
} | |
} | |
private static void checkNamedOutputName(String namedOutput) { | |
checkTokenName(namedOutput); | |
/* Name cannot be the name used for the default output */ | |
if (namedOutput.equals("part")) { | |
throw new IllegalArgumentException("Named output name cannot be 'part'"); | |
} | |
} | |
public void clear() { | |
collectors = new HashMap<String, MockTextCollector>(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment