Skip to content

Instantly share code, notes, and snippets.

@nowell-jana
Last active August 29, 2015 14:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nowell-jana/11275829 to your computer and use it in GitHub Desktop.
Save nowell-jana/11275829 to your computer and use it in GitHub Desktop.
public class TestCreateEffectiveSessions {
private MultipleTextOutputsReduceDriver<AvroKey<Utf8>, AvroValue<GenericRecord>, AvroKey<GenericRecord>, NullWritable> reduce_driver;
private JobConf conf;
private static final Text should_be_null = new Text("should_be_null");
@Before
public void setUp() {
conf = new JobConf();
conf.setInputFormat(AvroInputFormat.class);
AvroJob.setInputSchema(conf, EffectiveSessionSchema.TID_OMNIBUS);
AvroJob.setMapOutputSchema(
conf,
org.apache.avro.mapred.Pair.getPairSchema(Schema.create(Schema.Type.STRING),
EffectiveSessionSchema.TID_OMNIBUS));
AvroJob.setOutputSchema(conf, EffectiveSessionSchema.TID_OMNIBUS);
conf.setReducerClass(CreateEffectiveSessionsReducer.class);
MockMultipleTextOutputs.addMultiNamedOutput(conf, "text");
}
@Test
public void testReducer() throws IOException {
CreateEffectiveSessionsReducer reducer = new CreateEffectiveSessionsReducer();
ReflectUtils.injectMockByClass(ExternalLookupTool.class, reducer,
new MockExternalLookupTool());
reduce_driver = MultipleTextOutputsReduceDriver.newMultipleOutputsReduceDriver(reducer);
reduce_driver.setConfiguration(conf);
reduce_driver.registerMultipleOutputs(new MockMultipleTextOutputs(conf, should_be_null));
List<Pair<AvroKey<Utf8>, List<AvroValue<GenericRecord>>>> inputs =
new ArrayList<Pair<AvroKey<Utf8>, List<AvroValue<GenericRecord>>>>();
List<Pair<AvroKey<GenericRecord>, NullWritable>> outputs =
new ArrayList<Pair<AvroKey<GenericRecord>, NullWritable>>();
List<AvroValue<GenericRecord>> values = new ArrayList<AvroValue<GenericRecord>>();
values.add(new AvroValue<GenericRecord>(omnibus_in_1));
inputs.add(new Pair<AvroKey<Utf8>, List<AvroValue<GenericRecord>>>(
new AvroKey<Utf8>(tid_1), values));
outputs.add(new Pair<AvroKey<GenericRecord>, NullWritable>(
new AvroKey<GenericRecord>(omnibus_out_1), NullWritable.get()));
reduce_driver.addAll(inputs);
reduce_driver.prepareAvroSerializationForOutput(
EffectiveSessionSchema.TID_OMNIBUS.toString(), Schema.create(Type.NULL).toString());
reduce_driver.addAllOutput(outputs);
TreeMap<String, List<Pair<Text, Text>>> expected_multiple_output_map =
new TreeMap<String, List<Pair<Text, Text>>>();
List <Pair<Text, Text>> text_combined_list = new ArrayList<Pair<Text, Text>>();
for (Text combined : text_combined_output) {
text_combined_list.add(new Pair<Text, Text>(should_be_null, combined));
}
List <Pair<Text, Text>> text_info_list = new ArrayList<Pair<Text, Text>>();
for (Text info : text_info_output) {
text_info_list.add(new Pair<Text, Text>(should_be_null, info));
}
List <Pair<Text, Text>> text_offer_list = new ArrayList<Pair<Text, Text>>();
for (Text offer : text_offer_output) {
text_offer_list.add(new Pair<Text, Text>(should_be_null, offer));
}
expected_multiple_output_map.put("text_combined", text_combined_list);
expected_multiple_output_map.put("text_info", text_info_list);
expected_multiple_output_map.put("text_offer", text_offer_list);
reduce_driver.addAllMultipleOutput(expected_multiple_output_map);
reduce_driver.addIgnoredNamedOutput("text_log");
reduce_driver.runTest();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment