Skip to content

Instantly share code, notes, and snippets.

@airawat
Last active December 21, 2017 19:06
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 17 You must be signed in to fork a gist
  • Save airawat/6666608 to your computer and use it in GitHub Desktop.
Save airawat/6666608 to your computer and use it in GitHub Desktop.
ReduceSideJoin - Sample Java mapreduce program for joining datasets with cardinality of 1..1, and 1..many on the join key
My blog has an introduction to reduce side join in Java map reduce-
http://hadooped.blogspot.com/2013/09/reduce-side-join-options-in-java-map.html
***************************
Employee Dataset
***************************
[EmpNo,DOB,FName,LName,Gender,HireDate,DeptNo]
10001,1953-09-02,Georgi,Facello,M,1986-06-26,d005
10002,1964-06-02,Bezalel,Simmel,F,1985-11-21,d007
10003,1959-12-03,Parto,Bamford,M,1986-08-28,d004
.......
***************************
Salary Dataset - curent
[Text format]
***************************
[EmpNo,Salary,FromDate,ToDate]
10001,88958,2002-06-22,9999-01-01
10002,72527,2001-08-02,9999-01-01
10003,43311,2001-12-01,9999-01-01
.......
***************************
Salary Dataset - Historical
[Text format]
***************************
[EmpNo,Salary,FromDate,ToDate]
10001,88958,2002-06-22,9999-01-01
10001,85097,2001-06-22,2002-06-22
10001,85112,2000-06-22,2001-06-22
10001,84917,1999-06-23,2000-06-22
10001,81097,1998-06-23,1999-06-23
10001,81025,1997-06-23,1998-06-23
10001,80013,1996-06-23,1997-06-23
10001,76884,1995-06-24,1996-06-23
10001,75994,1994-06-24,1995-06-24
10001,75286,1993-06-24,1994-06-24
10001,74333,1992-06-24,1993-06-24
10001,71046,1991-06-25,1992-06-24
10001,66961,1990-06-25,1991-06-25
10001,66596,1989-06-25,1990-06-25
10001,66074,1988-06-25,1989-06-25
10001,62102,1987-06-26,1988-06-25
10001,60117,1986-06-26,1987-06-26
10002,72527,2001-08-02,9999-01-01
10002,71963,2000-08-02,2001-08-02
10002,69366,1999-08-03,2000-08-02
10002,67534,1998-08-03,1999-08-03
10002,65909,1997-08-03,1998-08-03
10002,65828,1996-08-03,1997-08-03
10003,43311,2001-12-01,9999-01-01
10003,43699,2000-12-01,2001-12-01
10003,43478,1999-12-02,2000-12-01
10003,43636,1998-12-02,1999-12-02
10003,43466,1997-12-02,1998-12-02
10003,43616,1996-12-02,1997-12-02
10003,40006,1995-12-03,1996-12-02
.......
********************************
Department Dataset
[Map file]
********************************
+---------+--------------------+
| dept_no | dept_name |
+---------+--------------------+
| d009 | Customer Service |
| d005 | Development |
| d002 | Finance |
| d003 | Human Resources |
| d001 | Marketing |
| d004 | Production |
| d006 | Quality Management |
| d008 | Research |
| d007 | Sales |
+---------+--------------------+
.......
//********************************************************************************
//Class: CompositeKeyWritableRSJ
//Purpose: Custom Writable that serves as composite key
// with attributes joinKey and sourceIndex
//Author: Anagha Khanolkar
//*********************************************************************************
package khanolkar.mapreduce.join.samples.reducesidejoin;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;
public class CompositeKeyWritableRSJ implements Writable,
WritableComparable<CompositeKeyWritableRSJ> {
// Data members
private String joinKey;// EmployeeID
private int sourceIndex;// 1=Employee data; 2=Salary (current) data; 3=Salary historical data
public CompositeKeyWritableRSJ() {
}
public CompositeKeyWritableRSJ(String joinKey, int sourceIndex) {
this.joinKey = joinKey;
this.sourceIndex = sourceIndex;
}
@Override
public String toString() {
return (new StringBuilder().append(joinKey).append("\t")
.append(sourceIndex)).toString();
}
public void readFields(DataInput dataInput) throws IOException {
joinKey = WritableUtils.readString(dataInput);
sourceIndex = WritableUtils.readVInt(dataInput);
}
public void write(DataOutput dataOutput) throws IOException {
WritableUtils.writeString(dataOutput, joinKey);
WritableUtils.writeVInt(dataOutput, sourceIndex);
}
public int compareTo(CompositeKeyWritableRSJ objKeyPair) {
int result = joinKey.compareTo(objKeyPair.joinKey);
if (0 == result) {
result = Double.compare(sourceIndex, objKeyPair.sourceIndex);
}
return result;
}
public String getjoinKey() {
return joinKey;
}
public void setjoinKey(String joinKey) {
this.joinKey = joinKey;
}
public int getsourceIndex() {
return sourceIndex;
}
public void setsourceIndex(int sourceIndex) {
this.sourceIndex = sourceIndex;
}
}
//********************************************************************************
//Class: MapperRSJ
//Purpose: Mapper
//Author: Anagha Khanolkar
//*********************************************************************************
package khanolkar.mapreduce.join.samples.reducesidejoin;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class MapperRSJ extends
Mapper<LongWritable, Text, CompositeKeyWritableRSJ, Text> {
CompositeKeyWritableRSJ ckwKey = new CompositeKeyWritableRSJ();
Text txtValue = new Text("");
int intSrcIndex = 0;
StringBuilder strMapValueBuilder = new StringBuilder("");
List<Integer> lstRequiredAttribList = new ArrayList<Integer>();
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
// {{
// Get the source index; (employee = 1, salary = 2)
// Added as configuration in driver
FileSplit fsFileSplit = (FileSplit) context.getInputSplit();
intSrcIndex = Integer.parseInt(context.getConfiguration().get(
fsFileSplit.getPath().getName()));
// }}
// {{
// Initialize the list of fields to emit as output based on
// intSrcIndex (1=employee, 2=current salary, 3=historical salary)
if (intSrcIndex == 1) // employee
{
lstRequiredAttribList.add(2); // FName
lstRequiredAttribList.add(3); // LName
lstRequiredAttribList.add(4); // Gender
lstRequiredAttribList.add(6); // DeptNo
} else // salary
{
lstRequiredAttribList.add(1); // Salary
lstRequiredAttribList.add(3); // Effective-to-date (Value of
// 9999-01-01 indicates current
// salary)
}
// }}
}
private String buildMapValue(String arrEntityAttributesList[]) {
// This method returns csv list of values to emit based on data entity
strMapValueBuilder.setLength(0);// Initialize
// Build list of attributes to output based on source - employee/salary
for (int i = 1; i < arrEntityAttributesList.length; i++) {
// If the field is in the list of required output
// append to stringbuilder
if (lstRequiredAttribList.contains(i)) {
strMapValueBuilder.append(arrEntityAttributesList[i]).append(
",");
}
}
if (strMapValueBuilder.length() > 0) {
// Drop last comma
strMapValueBuilder.setLength(strMapValueBuilder.length() - 1);
}
return strMapValueBuilder.toString();
}
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
if (value.toString().length() > 0) {
String arrEntityAttributes[] = value.toString().split(",");
ckwKey.setjoinKey(arrEntityAttributes[0].toString());
ckwKey.setsourceIndex(intSrcIndex);
txtValue.set(buildMapValue(arrEntityAttributes));
context.write(ckwKey, txtValue);
}
}
}
//********************************************************************************
//Class: PartitionerRSJ
//Purpose: Custom partitioner
//Author: Anagha Khanolkar
//*********************************************************************************
package khanolkar.mapreduce.join.samples.reducesidejoin;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class PartitionerRSJ extends Partitioner<CompositeKeyWritableRSJ, Text> {
@Override
public int getPartition(CompositeKeyWritableRSJ key, Text value,
int numReduceTasks) {
// Partitions on joinKey (EmployeeID)
return (key.getjoinKey().hashCode() % numReduceTasks);
}
}
package khanolkar.mapreduce.join.samples.reducesidejoin;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
//********************************************************************************
//Class: SortingComparatorRSJ
//Purpose: Sorting comparator
//Author: Anagha Khanolkar
//*********************************************************************************
public class SortingComparatorRSJ extends WritableComparator {
protected SortingComparatorRSJ() {
super(CompositeKeyWritableRSJ.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
// Sort on all attributes of composite key
CompositeKeyWritableRSJ key1 = (CompositeKeyWritableRSJ) w1;
CompositeKeyWritableRSJ key2 = (CompositeKeyWritableRSJ) w2;
int cmpResult = key1.getjoinKey().compareTo(key2.getjoinKey());
if (cmpResult == 0)// same joinKey
{
return Double.compare(key1.getsourceIndex(), key2.getsourceIndex());
}
return cmpResult;
}
}
package khanolkar.mapreduce.join.samples.reducesidejoin;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
//********************************************************************************
//Class: GroupingComparatorRSJ
//Purpose: For use as grouping comparator
//Author: Anagha Khanolkar
//*********************************************************************************
public class GroupingComparatorRSJ extends WritableComparator {
protected GroupingComparatorRSJ() {
super(CompositeKeyWritableRSJ.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
// The grouping comparator is the joinKey (Employee ID)
CompositeKeyWritableRSJ key1 = (CompositeKeyWritableRSJ) w1;
CompositeKeyWritableRSJ key2 = (CompositeKeyWritableRSJ) w2;
return key1.getjoinKey().compareTo(key2.getjoinKey());
}
}
package khanolkar.mapreduce.join.samples.reducesidejoin;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
//********************************************************************************
//Class: ReducerRSJ
//Purpose: Reducer
//Author: Anagha Khanolkar
//*********************************************************************************
public class ReducerRSJ extends
Reducer<CompositeKeyWritableRSJ, Text, NullWritable, Text> {
StringBuilder reduceValueBuilder = new StringBuilder("");
NullWritable nullWritableKey = NullWritable.get();
Text reduceOutputValue = new Text("");
String strSeparator = ",";
private MapFile.Reader deptMapReader = null;
Text txtMapFileLookupKey = new Text("");
Text txtMapFileLookupValue = new Text("");
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
// {{
// Get side data from the distributed cache
Path[] cacheFilesLocal = DistributedCache.getLocalCacheArchives(context
.getConfiguration());
for (Path eachPath : cacheFilesLocal) {
if (eachPath.getName().toString().trim()
.equals("departments_map.tar.gz")) {
URI uriUncompressedFile = new File(eachPath.toString()
+ "/departments_map").toURI();
initializeDepartmentsMap(uriUncompressedFile, context);
}
}
// }}
}
@SuppressWarnings("deprecation")
private void initializeDepartmentsMap(URI uriUncompressedFile, Context context)
throws IOException {
// {{
// Initialize the reader of the map file (side data)
FileSystem dfs = FileSystem.get(context.getConfiguration());
try {
deptMapReader = new MapFile.Reader(dfs,
uriUncompressedFile.toString(), context.getConfiguration());
} catch (Exception e) {
e.printStackTrace();
}
// }}
}
private StringBuilder buildOutputValue(CompositeKeyWritableRSJ key,
StringBuilder reduceValueBuilder, Text value) {
if (key.getsourceIndex() == 1) {
// Employee data
// {{
// Get the department name from the MapFile in distributedCache
// Insert the joinKey (empNo) to beginning of the stringBuilder
reduceValueBuilder.append(key.getjoinKey()).append(strSeparator);
String arrEmpAttributes[] = value.toString().split(",");
txtMapFileLookupKey.set(arrEmpAttributes[3].toString());
try {
deptMapReader.get(txtMapFileLookupKey, txtMapFileLookupValue);
} catch (Exception e) {
txtMapFileLookupValue.set("");
} finally {
txtMapFileLookupValue
.set((txtMapFileLookupValue.equals(null) || txtMapFileLookupValue
.equals("")) ? "NOT-FOUND"
: txtMapFileLookupValue.toString());
}
// }}
// {{
// Append the department name to the map values to form a complete
// CSV of employee attributes
reduceValueBuilder.append(value.toString()).append(strSeparator)
.append(txtMapFileLookupValue.toString())
.append(strSeparator);
// }}
} else if (key.getsourceIndex() == 2) {
// Current recent salary data (1..1 on join key)
// Salary data; Just append the salary, drop the effective-to-date
String arrSalAttributes[] = value.toString().split(",");
reduceValueBuilder.append(arrSalAttributes[0].toString()).append(
strSeparator);
} else // key.getsourceIndex() == 3; Historical salary data
{
// {{
// Get the salary data but extract only current salary
// (to_date='9999-01-01')
String arrSalAttributes[] = value.toString().split(",");
if (arrSalAttributes[1].toString().equals("9999-01-01")) {
// Salary data; Just append
reduceValueBuilder.append(arrSalAttributes[0].toString())
.append(strSeparator);
}
// }}
}
// {{
// Reset
txtMapFileLookupKey.set("");
txtMapFileLookupValue.set("");
// }}
return reduceValueBuilder;
}
@Override
public void reduce(CompositeKeyWritableRSJ key, Iterable<Text> values,
Context context) throws IOException, InterruptedException {
// Iterate through values; First set is csv of employee data
// second set is salary data; The data is already ordered
// by virtue of secondary sort; Append each value;
for (Text value : values) {
buildOutputValue(key, reduceValueBuilder, value);
}
// Drop last comma, set value, and emit output
if (reduceValueBuilder.length() > 1) {
reduceValueBuilder.setLength(reduceValueBuilder.length() - 1);
// Emit output
reduceOutputValue.set(reduceValueBuilder.toString());
context.write(nullWritableKey, reduceOutputValue);
} else {
System.out.println("Key=" + key.getjoinKey() + "src="
+ key.getsourceIndex());
}
// Reset variables
reduceValueBuilder.setLength(0);
reduceOutputValue.set("");
}
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
deptMapReader.close();
}
}
package khanolkar.mapreduce.join.samples.reducesidejoin;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
//********************************************************************************
//Class: DriverRSJ
//Purpose: Driver for Reduce Side Join of two datasets
// with a 1..1 or 1..many cardinality on join key
//Author: Anagha Khanolkar
//*********************************************************************************
public class DriverRSJ extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
// {{
// Exit job if required arguments have not been provided
if (args.length != 3) {
System.out
.printf("Three parameters are required for DriverRSJ- <input dir1> <input dir2> <output dir>\n");
return -1;
}
// }{
// {{
// Job instantiation
Job job = new Job(getConf());
Configuration conf = job.getConfiguration();
job.setJarByClass(DriverRSJ.class);
job.setJobName("ReduceSideJoin");
// }}
// {{
// Add side data to distributed cache
DistributedCache
.addCacheArchive(
new URI(
"/user/akhanolk/joinProject/data/departments_map.tar.gz"),
conf);
// }}
// {
// Set sourceIndex for input files;
// sourceIndex is an attribute of the compositeKey,
// to drive order, and reference source
// Can be done dynamically; Hard-coded file names for simplicity
conf.setInt("part-e", 1);// Set Employee file to 1
conf.setInt("part-sc", 2);// Set Current salary file to 2
conf.setInt("part-sh", 3);// Set Historical salary file to 3
// }
// {
// Build csv list of input files
StringBuilder inputPaths = new StringBuilder();
inputPaths.append(args[0].toString()).append(",")
.append(args[1].toString());
// }
// {{
// Configure remaining aspects of the job
FileInputFormat.setInputPaths(job, inputPaths.toString());
FileOutputFormat.setOutputPath(job, new Path(args[2]));
job.setMapperClass(MapperRSJ.class);
job.setMapOutputKeyClass(CompositeKeyWritableRSJ.class);
job.setMapOutputValueClass(Text.class);
job.setPartitionerClass(PartitionerRSJ.class);
job.setSortComparatorClass(SortingComparatorRSJ.class);
job.setGroupingComparatorClass(GroupingComparatorRSJ.class);
job.setNumReduceTasks(4);
job.setReducerClass(ReducerRSJ.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
// }}
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(), new DriverRSJ(),
args);
System.exit(exitCode);
}
}
/*************************************
Joining datasets in Pig
Employee..Salary = 1..many
Displaying most recent salary
Without using any join optimizations
**************************************/
rawEmpDS = load '/user/akhanolk/joinProject/data/employees_active/part-e' using PigStorage(',') as (empNo:chararray,dOB:chararray,lName:chararray,fName:chararray,gender:chararray,hireDate:chararray,deptNo:chararray);
empDS = foreach rawEmpDS generate empNo,fName,lName,gender,deptNo;
rawSalDS = load '/user/akhanolk/joinProject/data/salaries_history/part-sh' using PigStorage(',') as (empNo:chararray,salary:long,fromDate:chararray,toDate:chararray);
filteredSalDS = filter rawSalDS by toDate == '9999-01-01';
salDS = foreach filteredSalDS generate empNo, salary;
joinedDS = join empDS by empNo, salDS by empNo;
finalDS = foreach joinedDS generate empDS::empNo,empDS::fName,empDS::lName,empDS::gender,empDS::deptNo,salDS::salary;
store finalDS into '/user/akhanolk/joinProject/output/pig-RSJ';
rawEmpDS = load '/user/akhanolk/joinProject/data/employees_active/part-e' using PigStorage(',') as (empNo:chararray,dOB:chararray,lName:chararray,fName:chararray,gender:chararray,hireDate:chararray,deptNo:chararray);
empDS = foreach rawEmpDS generate empNo,fName,lName,gender,deptNo;
sortedEmpDS = ORDER empDS by empNo;
rawSalDS = load '/user/akhanolk/joinProject/data/salaries_history/part-sh' using PigStorage(',') as (empNo:chararray,salary:long,fromDate:chararray,toDate:chararray);
filteredSalDS = filter rawSalDS by toDate == '9999-01-01';
salDS = foreach filteredSalDS generate empNo, salary;
sortedSalDS = ORDER salDS by empNo;
joinedDS = join sortedEmpDS by empNo, sortedSalDS by empNo using 'merge';
finalDS = foreach joinedDS generate sortedEmpDS::empNo,sortedEmpDS::fName,sortedEmpDS::lName,sortedEmpDS::gender,sortedEmpDS::deptNo,sortedSalDS::salary;
store finalDS into '/user/akhanolk/joinProject/output/pig-RSJ';
**********************
Output of pig script
**********************
$ hadoop fs -cat joinProject/output/pig-RSJ/part* | less
10001 Facello Georgi M d005 88958
10002 Simmel Bezalel F d007 72527
10003 Bamford Parto M d004 43311
10004 Koblick Chirstian M d004 74057
.........
@kaliavarun
Copy link

This is gold Anagha . Thanks a lot

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment