Skip to content

Instantly share code, notes, and snippets.

@baunz
Created January 30, 2013 22:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save baunz/4677875 to your computer and use it in GitHub Desktop.
Save baunz/4677875 to your computer and use it in GitHub Desktop.
fix for multiple outputs
Index: mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java
===================================================================
--- mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java (revision 1440190)
+++ mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java (working copy)
@@ -133,6 +133,8 @@
private static final String COUNTERS_ENABLED =
"avro.mapreduce.multipleoutputs.counters";
+ private static final String KEY_SCHEMA = ".keySchema";
+ private static final String VALUE_SCHEMA = ".valueSchema";
/**
* Counters group used by the counters of MultipleOutputs.
*/
@@ -144,16 +146,6 @@
private Map<String, TaskAttemptContext> taskContexts = new HashMap<String, TaskAttemptContext>();
/**
- * Cache for the Key Schemas
- */
- private static Map<String, Schema> keySchemas = new HashMap<String, Schema>();
-
- /**
- * Cache for the Value Schemas
- */
- private static Map<String, Schema> valSchemas = new HashMap<String, Schema>();
-
- /**
* Checks if a named output name is valid token.
*
* @param namedOutput named output Name
@@ -272,8 +264,10 @@
conf.get(MULTIPLE_OUTPUTS, "") + " " + namedOutput);
conf.setClass(MO_PREFIX + namedOutput + FORMAT, outputFormatClass,
OutputFormat.class);
- keySchemas.put(namedOutput+"_KEYSCHEMA",keySchema);
- valSchemas.put(namedOutput+"_VALSCHEMA",valueSchema);
+ conf.set(MO_PREFIX + namedOutput + KEY_SCHEMA, keySchema.toString());
+ if(valueSchema != null){
+ conf.set(MO_PREFIX + namedOutput + VALUE_SCHEMA, valueSchema.toString());
+ }
}
@@ -473,13 +467,19 @@
return taskContext;
}
+ Configuration configuration = context.getConfiguration();
// The following trick leverages the instantiation of a record writer via
// the job thus supporting arbitrary output formats.
- context.getConfiguration().set("avro.mo.config.namedOutput",nameOutput);
- Job job = new Job(context.getConfiguration());
+
+ configuration.set("avro.mo.config.namedOutput",nameOutput);
+ Job job = new Job(configuration);
job.setOutputFormatClass(getNamedOutputFormatClass(context, nameOutput));
- Schema keySchema = keySchemas.get(nameOutput+"_KEYSCHEMA");
- Schema valSchema = valSchemas.get(nameOutput+"_VALSCHEMA");
+ String keySchemaDefinition = configuration.get(MO_PREFIX + nameOutput + KEY_SCHEMA);
+ String valueSchemaDefinition = configuration.get(MO_PREFIX + nameOutput + VALUE_SCHEMA,null);
+
+ Schema.Parser parser = new Schema.Parser();
+ Schema keySchema = parser.parse(keySchemaDefinition);
+ Schema valSchema = valueSchemaDefinition != null ? parser.parse(valueSchemaDefinition) : null;
boolean isMaponly=job.getNumReduceTasks() == 0;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment