public
Created

fix for multiple outputs

  • Download Gist
patch
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
Index: mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java
===================================================================
--- mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java (revision 1440190)
+++ mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java (working copy)
@@ -133,6 +133,8 @@
private static final String COUNTERS_ENABLED =
"avro.mapreduce.multipleoutputs.counters";
+ private static final String KEY_SCHEMA = ".keySchema";
+ private static final String VALUE_SCHEMA = ".valueSchema";
/**
* Counters group used by the counters of MultipleOutputs.
*/
@@ -144,16 +146,6 @@
private Map<String, TaskAttemptContext> taskContexts = new HashMap<String, TaskAttemptContext>();
/**
- * Cache for the Key Schemas
- */
- private static Map<String, Schema> keySchemas = new HashMap<String, Schema>();
-
- /**
- * Cache for the Value Schemas
- */
- private static Map<String, Schema> valSchemas = new HashMap<String, Schema>();
-
- /**
* Checks if a named output name is valid token.
*
* @param namedOutput named output Name
@@ -272,8 +264,10 @@
conf.get(MULTIPLE_OUTPUTS, "") + " " + namedOutput);
conf.setClass(MO_PREFIX + namedOutput + FORMAT, outputFormatClass,
OutputFormat.class);
- keySchemas.put(namedOutput+"_KEYSCHEMA",keySchema);
- valSchemas.put(namedOutput+"_VALSCHEMA",valueSchema);
+ conf.set(MO_PREFIX + namedOutput + KEY_SCHEMA, keySchema.toString());
+ if(valueSchema != null){
+ conf.set(MO_PREFIX + namedOutput + VALUE_SCHEMA, valueSchema.toString());
+ }
}
@@ -473,13 +467,19 @@
return taskContext;
}
+ Configuration configuration = context.getConfiguration();
// The following trick leverages the instantiation of a record writer via
// the job thus supporting arbitrary output formats.
- context.getConfiguration().set("avro.mo.config.namedOutput",nameOutput);
- Job job = new Job(context.getConfiguration());
+
+ configuration.set("avro.mo.config.namedOutput",nameOutput);
+ Job job = new Job(configuration);
job.setOutputFormatClass(getNamedOutputFormatClass(context, nameOutput));
- Schema keySchema = keySchemas.get(nameOutput+"_KEYSCHEMA");
- Schema valSchema = valSchemas.get(nameOutput+"_VALSCHEMA");
+ String keySchemaDefinition = configuration.get(MO_PREFIX + nameOutput + KEY_SCHEMA);
+ String valueSchemaDefinition = configuration.get(MO_PREFIX + nameOutput + VALUE_SCHEMA,null);
+
+ Schema.Parser parser = new Schema.Parser();
+ Schema keySchema = parser.parse(keySchemaDefinition);
+ Schema valSchema = valueSchemaDefinition != null ? parser.parse(valueSchemaDefinition) : null;
boolean isMaponly=job.getNumReduceTasks() == 0;

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.