Skip to content

Instantly share code, notes, and snippets.

@mlehman
Last active August 29, 2015 13:55
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mlehman/8772354 to your computer and use it in GitHub Desktop.
Save mlehman/8772354 to your computer and use it in GitHub Desktop.
Secondary Sort in Avro using Pair
import org.apache.avro.Schema;
import org.apache.avro.io.BinaryData;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.Pair;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.reflect.ReflectData;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.RawComparator;
import java.util.ArrayList;
import java.util.List;
/**
* Example of Secondary Sort in Avro using Pair with org.apache.avro.mapreduce to order values in the reducer.
*
* Pair by default sorts by key only. By using our own version of the Pair Schema in a sort Comparator
* that uses both the key and value, we can create a comparator usable for doing a secondary sort.
*
* With a job with a Mapper producing output of keys (AvroKey<Pair<K,V>>) and values (AvroValue<T>), this will result
* in the Reducer receiving the keys grouped by the Pair key (K) and the values AvroValue<T> sorted by the Pair value (V).
*
* In your job
* <pre>{@code
* // Usage: Set the MapOutput for your pair
* // AvroJob by default will set the sort and group comparators to AvroKeyComparator which will group and sort by the Pair's key only.
* AvroJob.setMapOutputKeySchema(job,Pair.getPairSchema(Schema.create(Schema.Type.STRING), Schema.create(Schema.Type.INT)));
*
* // Then set the SortComparator (overriding AvroKeyComparator set by AvroJob for sort only)
* job.setSortComparatorClass(AvroSecondarySortComparator.class);
*
* }</pre>
* */
public class AvroSecondarySortComparator<K,V> extends Configured implements RawComparator<AvroKey<Pair<K,V>>> {
public static final String KEY = "key";
public static final String VALUE = "value";
Schema schema;
@Override
public void setConf(Configuration conf) {
super.setConf(conf);
if (conf != null) {
Schema pairSchema = AvroJob.getMapOutputKeySchema(conf);
schema = getSortableValuePair(pairSchema.getField(KEY).schema(),
pairSchema.getField(VALUE).schema());
}
}
private Schema getSortableValuePair(Schema key, Schema value) {
Schema pair = Schema.createRecord(Pair.class.getName(), null, null, false);
List<Schema.Field> fields = new ArrayList<Schema.Field>();
fields.add(new Schema.Field(KEY, key, "", null));
fields.add(new Schema.Field(VALUE, value, "", null)); //Change from Pair's Schema.Field.Order.IGNORE
pair.setFields(fields);
return pair;
}
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return BinaryData.compare(b1, s1, l1, b2, s2, l2, schema);
}
public int compare(AvroKey<Pair<K,V>> x, AvroKey<Pair<K,V>> y) {
return ReflectData.get().compare(x.datum(), y.datum(), schema);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment