Created
January 30, 2013 22:32
-
-
Save baunz/4677875 to your computer and use it in GitHub Desktop.
fix for multiple outputs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Index: mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java | |
=================================================================== | |
--- mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java (revision 1440190) | |
+++ mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java (working copy) | |
@@ -133,6 +133,8 @@ | |
private static final String COUNTERS_ENABLED = | |
"avro.mapreduce.multipleoutputs.counters"; | |
+ private static final String KEY_SCHEMA = ".keySchema"; | |
+ private static final String VALUE_SCHEMA = ".valueSchema"; | |
/** | |
* Counters group used by the counters of MultipleOutputs. | |
*/ | |
@@ -144,16 +146,6 @@ | |
private Map<String, TaskAttemptContext> taskContexts = new HashMap<String, TaskAttemptContext>(); | |
/** | |
- * Cache for the Key Schemas | |
- */ | |
- private static Map<String, Schema> keySchemas = new HashMap<String, Schema>(); | |
- | |
- /** | |
- * Cache for the Value Schemas | |
- */ | |
- private static Map<String, Schema> valSchemas = new HashMap<String, Schema>(); | |
- | |
- /** | |
* Checks if a named output name is valid token. | |
* | |
* @param namedOutput named output Name | |
@@ -272,8 +264,10 @@ | |
conf.get(MULTIPLE_OUTPUTS, "") + " " + namedOutput); | |
conf.setClass(MO_PREFIX + namedOutput + FORMAT, outputFormatClass, | |
OutputFormat.class); | |
- keySchemas.put(namedOutput+"_KEYSCHEMA",keySchema); | |
- valSchemas.put(namedOutput+"_VALSCHEMA",valueSchema); | |
+ conf.set(MO_PREFIX + namedOutput + KEY_SCHEMA, keySchema.toString()); | |
+ if(valueSchema != null){ | |
+ conf.set(MO_PREFIX + namedOutput + VALUE_SCHEMA, valueSchema.toString()); | |
+ } | |
} | |
@@ -473,13 +467,19 @@ | |
return taskContext; | |
} | |
+ Configuration configuration = context.getConfiguration(); | |
// The following trick leverages the instantiation of a record writer via | |
// the job thus supporting arbitrary output formats. | |
- context.getConfiguration().set("avro.mo.config.namedOutput",nameOutput); | |
- Job job = new Job(context.getConfiguration()); | |
+ | |
+ configuration.set("avro.mo.config.namedOutput",nameOutput); | |
+ Job job = new Job(configuration); | |
job.setOutputFormatClass(getNamedOutputFormatClass(context, nameOutput)); | |
- Schema keySchema = keySchemas.get(nameOutput+"_KEYSCHEMA"); | |
- Schema valSchema = valSchemas.get(nameOutput+"_VALSCHEMA"); | |
+ String keySchemaDefinition = configuration.get(MO_PREFIX + nameOutput + KEY_SCHEMA); | |
+ String valueSchemaDefinition = configuration.get(MO_PREFIX + nameOutput + VALUE_SCHEMA,null); | |
+ | |
+ Schema.Parser parser = new Schema.Parser(); | |
+ Schema keySchema = parser.parse(keySchemaDefinition); | |
+ Schema valSchema = valueSchemaDefinition != null ? parser.parse(valueSchemaDefinition) : null; | |
boolean isMaponly=job.getNumReduceTasks() == 0; | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment