Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Secondary sort in mapreduce - Includes code for a simple program that sorts employee information by department ascending and employee name desc.
Secondary sort in Mapreduce
With mapreduce framework, the keys are sorted but the values associated with each key
are not. In order for the values to be sorted, we need to write code to perform what is
referred to a secondary sort. The sample code in this gist demonstrates such a sort.
The input to the program is a bunch of employee attributes.
The output required is department number (deptNo) in ascending order, and the employee last name,
first name and employee ID in descending order.
The recipe to get the effect of sorting by value is:
1) Make the key a composite of the natural key (deptNo) and the natural value (lName, fName and empNo).
2) The sort comparator should order by the composite key, that is, the natural key and natural
value.
3) The partitioner and grouping comparator for the composite key should consider only the natural
key for partitioning and grouping.
*******************************
*Data and code download
*******************************
Data and code:
--------------
gitHub:
<<To be added>>
Email me at airawat.blog@gmail.com if you encounter any issues
Directory structure
-------------------
sortProject
data
employees_tsv
employees_tsv
SecondarySortBasic
src
CompositeKeyWritable.java
SecondarySortBasicMapper.java
SecondarySortBasicPartitioner.java
SecondarySortBasicCompKeySortComparator.java
SecondarySortBasicGroupingComparator.java
SecondarySortBasicReducer.java
SecondarySortBasicDriver.java
jar
SecondarySortBasic.jar
*******************************
*Sample Data
*******************************
EmpID DOB FName LName Gender Hire date DeptID
10003 1959-12-03 Parto Bamford M 1986-08-28 d004
10004 1954-05-01 Chirstian Koblick M 1986-12-01 d004
10005 1955-01-21 Kyoichi Maliniak M 1989-09-12 d003
....
*******************************
*Expected results
*******************************
Sort order: [DeptID asc, {LName,FName,EmpID} desc]
DeptID LName FName EmpID
d001 Zykh Sudhanshu 205927
d001 Zykh Nidapan 452738
..
d001 Yoshimura Alenka 463297
d001 Yeung Yuguang 483161
..
d001 Acton Basim 105207
d001 Aamodt Sreekrishna 493601
..
d002 Aamodt Yakkov 43290
..
d003 Acton Idoia 211583
..
d004 dAstous Candido 59201
d004 dAstous Berhard 427930
..
d005 Zizka Aamer 409151
d005 Zirintsis Xiaoqiang 52246
....
/***************************************************************
*CustomWritable for the composite key: CompositeKeyWritable
****************************************************************/
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;
/**
*
* @author akhanolkar
*
* Purpose: A custom writable with two attributes- deptNo and
* NameEmpIDPair;
*/
public class CompositeKeyWritable implements Writable,
WritableComparable<CompositeKeyWritable> {
private String deptNo;
private String lNameEmpIDPair;
public CompositeKeyWritable() {
}
public CompositeKeyWritable(String deptNo, String lNameEmpIDPair) {
this.deptNo = deptNo;
this.lNameEmpIDPair = lNameEmpIDPair;
}
@Override
public String toString() {
return (new StringBuilder().append(deptNo).append("\t")
.append(lNameEmpIDPair)).toString();
}
public void readFields(DataInput dataInput) throws IOException {
deptNo = WritableUtils.readString(dataInput);
lNameEmpIDPair = WritableUtils.readString(dataInput);
}
public void write(DataOutput dataOutput) throws IOException {
WritableUtils.writeString(dataOutput, deptNo);
WritableUtils.writeString(dataOutput, lNameEmpIDPair);
}
public int compareTo(CompositeKeyWritable objKeyPair) {
// TODO:
/*
* Note: This code will work as it stands; but when CompositeKeyWritable
* is used as key in a map-reduce program, it is de-serialized into an
* object for comapareTo() method to be invoked;
*
* To do: To optimize for speed, implement a raw comparator - will
* support comparison of serialized representations
*/
int result = deptNo.compareTo(objKeyPair.deptNo);
if (0 == result) {
result = lNameEmpIDPair.compareTo(objKeyPair.lNameEmpIDPair);
}
return result;
}
public String getDeptNo() {
return deptNo;
}
public void setDeptNo(String deptNo) {
this.deptNo = deptNo;
}
public String getLNameEmpIDPair() {
return lNameEmpIDPair;
}
public void setLNameEmpIDPair(String lNameEmpIDPair) {
this.lNameEmpIDPair = lNameEmpIDPair;
}
}
/***************************************************************
*Mapper: SecondarySortBasicMapper
***************************************************************/
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class SecondarySortBasicMapper extends
Mapper<LongWritable, Text, CompositeKeyWritable, NullWritable> {
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
if (value.toString().length() > 0) {
String arrEmpAttributes[] = value.toString().split("\\t");
context.write(
new CompositeKeyWritable(
arrEmpAttributes[6].toString(),
(arrEmpAttributes[3].toString() + "\t"
+ arrEmpAttributes[2].toString() + "\t" + arrEmpAttributes[0]
.toString())), NullWritable.get());
}
}
}
/***************************************************************
*Partitioner: SecondarySortBasicPartitioner
***************************************************************/
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class SecondarySortBasicPartitioner extends
Partitioner<CompositeKeyWritable, NullWritable> {
@Override
public int getPartition(CompositeKeyWritable key, NullWritable value,
int numReduceTasks) {
return (key.getDeptNo().hashCode() % numReduceTasks);
}
}
/***************************************************************
*SortComparator: SecondarySortBasicCompKeySortComparator
*****************************************************************/
import org.apache.hadoop.io.WritableComparator;
public class SecondarySortBasicCompKeySortComparator extends WritableComparator {
protected SecondarySortBasicCompKeySortComparator() {
super(CompositeKeyWritable.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
CompositeKeyWritable key1 = (CompositeKeyWritable) w1;
CompositeKeyWritable key2 = (CompositeKeyWritable) w2;
int cmpResult = key1.getDeptNo().compareTo(key2.getDeptNo());
if (cmpResult == 0)// same deptNo
{
return -key1.getLNameEmpIDPair()
.compareTo(key2.getLNameEmpIDPair());
//If the minus is taken out, the values will be in
//ascending order
}
return cmpResult;
}
}
***************************************************************
*GroupingComparator: SecondarySortBasicGroupingComparator
***************************************************************
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class SecondarySortBasicGroupingComparator extends WritableComparator {
protected SecondarySortBasicGroupingComparator() {
super(CompositeKeyWritable.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
CompositeKeyWritable key1 = (CompositeKeyWritable) w1;
CompositeKeyWritable key2 = (CompositeKeyWritable) w2;
return key1.getDeptNo().compareTo(key2.getDeptNo());
}
}
***************************************
*Reducer: SecondarySortBasicReducer
***************************************
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class SecondarySortBasicReducer
extends
Reducer<CompositeKeyWritable, NullWritable, CompositeKeyWritable, NullWritable> {
@Override
public void reduce(CompositeKeyWritable key, Iterable<NullWritable> values,
Context context) throws IOException, InterruptedException {
for (NullWritable value : values) {
context.write(key, NullWritable.get());
}
}
}
***************************************
*Driver: SecondarySortBasicDriver
***************************************
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class SecondarySortBasicDriver extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.out
.printf("Two parameters are required for SecondarySortBasicDriver- <input dir> <output dir>\n");
return -1;
}
Job job = new Job(getConf());
job.setJobName("Secondary sort example");
job.setJarByClass(SecondarySortBasicDriver.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(SecondarySortBasicMapper.class);
job.setMapOutputKeyClass(CompositeKeyWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setPartitionerClass(SecondarySortBasicPartitioner.class);
job.setSortComparatorClass(SecondarySortBasicCompKeySortComparator.class);
job.setGroupingComparatorClass(SecondarySortBasicGroupingComparator.class);
job.setReducerClass(SecondarySortBasicReducer.class);
job.setOutputKeyClass(CompositeKeyWritable.class);
job.setOutputValueClass(NullWritable.class);
job.setNumReduceTasks(8);
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(),
new SecondarySortBasicDriver(), args);
System.exit(exitCode);
}
}
*******************************
*Command to run the program
*******************************
hadoop jar ~/Blog/sortProject/secondarySortBasic/jar/secondarySortBasic.jar SecondarySortBasicDriver /user/akhanolk/sortProject/data/employees/employees_tsv /user/akhanolk/sortProject/data/output-secondarySortBasic
*******************************
*Results
*******************************
--Source record count
hadoop fs -cat sortProject/data/employees/employees_tsv | wc -l
2246830
--Results record count
hadoop fs -cat sortProject/data/output-secondarySortBasic/part* | wc -l
2246830
--Files generated
hadoop fs -ls -R sortProject/data/output-secondarySortBasic/part* | awk '{print $8}'
sortProject/data/output-secondarySortBasic/part-r-00000
sortProject/data/output-secondarySortBasic/part-r-00001
sortProject/data/output-secondarySortBasic/part-r-00002
sortProject/data/output-secondarySortBasic/part-r-00003
sortProject/data/output-secondarySortBasic/part-r-00004
sortProject/data/output-secondarySortBasic/part-r-00005
sortProject/data/output-secondarySortBasic/part-r-00006
sortProject/data/output-secondarySortBasic/part-r-00007
--Output
hadoop fs -cat sortProject/data/output-secondarySortBasic/part*
d001 Zykh Sudhanshu 205927
d001 Zykh Nidapan 452738
..
d001 Yoshimura Alenka 463297
d001 Yeung Yuguang 483161
..
d001 Acton Basim 105207
d001 Aamodt Sreekrishna 493601
..
d002 Aamodt Yakkov 43290
..
d003 Acton Idoia 211583
..
d004 dAstous Candido 59201
d004 dAstous Berhard 427930
..
d005 Zizka Aamer 409151
d005 Zirintsis Xiaoqiang 52246
....
**********************
Reference:
**********************
Hadoop the definitive guide, 3rd edition
**********************
Credits:
**********************
Data from mysql - http://dev.mysql.com/doc/employee/en.index.html
@alpeshrpatel

This comment has been minimized.

Copy link

alpeshrpatel commented Apr 16, 2014

awesome

@harshavmb

This comment has been minimized.

Copy link

harshavmb commented Aug 18, 2015

super. Great explanation!

@bigbossmondher

This comment has been minimized.

Copy link

bigbossmondher commented Oct 21, 2015

Hi cann you help me please, i have to dataset merged in one file, the outpute file contain this

@Attribute

@dataa

@Attribute

@DaTa

i want to sort this file and have output file with this structure
@atribute
-----(combined together)
@DaTa
--- combined totgether
can you helpme to achine this step please ?

@BrandonSollins

This comment has been minimized.

Copy link

BrandonSollins commented May 3, 2016

Thanks for sharing, this was super helpful!

@sheesho99

This comment has been minimized.

Copy link

sheesho99 commented Dec 8, 2018

Great work !! Super expln !!
pls attach jar file and .java and emp.txt with any other files so we can run and test on local Hadoop.....
if we have 2 jar files 1 ASC sort and 1 DESC sort - how do we execute with Emp.txt - need commands

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.