Skip to content

Instantly share code, notes, and snippets.

@airawat
Last active December 23, 2017 07:11
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 10 You must be signed in to fork a gist
  • Save airawat/6665327 to your computer and use it in GitHub Desktop.
Save airawat/6665327 to your computer and use it in GitHub Desktop.
MapsideJoinOfTwoLargeDatasets(Old API) - Joining (inner join) two large datasets on the map side
**********************
**Gist
**********************
This gist details how to inner join two large datasets on the map-side, leveraging the join capability
in mapreduce. Such a join makes sense if both input datasets are too large to qualify for distribution
through distributedcache, and can be implemented if both input datasets can be joined by the join key
and both input datasets are sorted in the same order, by the join key.
There are two critical pieces to engaging the join behavior:
- the input format must be set to CompositeInputFormat.class, and
- the key mapred.join.expr must have a value that is a valid join specification.
Sample program:
Covers inner join of employee and salary data with employee ID as join key in a map-only program
Inner join:
The inner join is a traditional database-style inner join. The map method will be called with a key/value
set only if every dataset in the join contains the key. The TupleWritable value will contain a value for
every dataset in the join, join key excluded.
Key code in the sample program:
conf.setInputFormat(CompositeInputFormat.class);
String strJoinStmt = CompositeInputFormat.compose("inner",
KeyValueLongInputFormat.class, dirEmployeesData, dirSalaryData);
conf.set("mapred.join.expr", strJoinStmt);
conf.setOutputFormat(TextOutputFormat.class);
TextOutputFormat.setOutputPath(conf, dirOutput);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
Old API:
I ended up using the old API as the new API does not include CompositeInputFormat in the version of
Hadoop I am running.
*******************************
*Data and code download
*******************************
Data and code:
--------------
gitHub:
<<To be added>>
Email me at airawat.blog@gmail.com if you encounter any issues
Directory structure
-------------------
joinProject
data
employees_sorted
part-e
salaries_sorted
part-s
MapSideJoinLargeDatasets
src
KeyValueLongInputFormat.java
KeyValueLongLineRecordReader.java
MapperMapSideJoinLargeDatasets.java
DriverMapSideJoinLargeDatasets.java
jar
MapSideJoinLgDsOAPI.jar
********************************
Data Structure Review
********************************
Datasets:
The two datasets are employee and salary datasets.
Join key:
The join key is EmpNo/employee number
Location of join key:
The join key is the first field in both datasets
Sorting:
The data is sorted by the join key "EmpNo" in ascending order.
Sorting is crucial for accuracy of joins
File format:
The files are in text format, with comma as a separator
Cardinality:
Is 1..1 on join key; Both datasets have the same number of records
Employee data [joinProject/data/employees_sorted/part-e]
--------------------------------------------------------
[EmpNo,DOB,FName,LName,Gender,HireDate,DeptNo]
10001,1953-09-02,Georgi,Facello,M,1986-06-26,d005
10002,1964-06-02,Bezalel,Simmel,F,1985-11-21,d007
10003,1959-12-03,Parto,Bamford,M,1986-08-28,d004
10004,1954-05-01,Chirstian,Koblick,M,1986-12-01,d004
10005,1955-01-21,Kyoichi,Maliniak,M,1989-09-12,d003
10006,1953-04-20,Anneke,Preusig,F,1989-06-02,d005
.....
Salary data [joinProject/data/salaries_sorted/part-s]
------------------------------------------------------
[EmpNo,Salary,FromDate,ToDate]
10001,88958,2002-06-22,9999-01-01
10002,72527,2001-08-02,9999-01-01
10003,43311,2001-12-01,9999-01-01
10004,74057,2001-11-27,9999-01-01
10005,94692,2001-09-09,9999-01-01
..........
************************************
Expected Results - tab separated
************************************
[EmpNo FName LName Salary]
10001 Georgi Facello 88958
10002 Bezalel Simmel 72527
10003 Parto Bamford 43311
10004 Chirstian Koblick 74057
10005 Kyoichi Maliniak 94692
10006 Anneke Preusig 59755
10009 Sumant Peac 94409
10010 Duangkaew Piveteau 80324
........
******************************
Observations
******************************
Setting the inputformat to KeyValueTextInputFormat resulted in only
part of the data getting joined. I attributed this to the fact that
the EmpNo is numeric and the sort was not working right with the attribute
set as Text. Found that others had encountered the same issue..and one
individualhad created a custom format - KeyValueLongInputFormat and associated record
reader. This gist uses the same code, with minor modifications.
http://stackoverflow.com/questions/13415359/hadoop-compositeinputformat-not-joining-all-data
/**********************************
*KeyValueLongLineRecordReader.java
*Custom record reader
**********************************/
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;
public class KeyValueLongLineRecordReader implements
RecordReader<LongWritable, Text> {
private final LineRecordReader lineRecordReader;
private byte separator = (byte) ',';
private LongWritable dummyKey;
private Text innerValue;
public Class getKeyClass() {
return LongWritable.class;
}
public LongWritable createKey() {
return new LongWritable();
}
public Text createValue() {
return new Text();
}
public KeyValueLongLineRecordReader(Configuration job, FileSplit split)
throws IOException {
lineRecordReader = new LineRecordReader(job, split);
dummyKey = lineRecordReader.createKey();
innerValue = lineRecordReader.createValue();
String sepStr = job.get("key.value.separator.in.input.line", ",");
this.separator = (byte) sepStr.charAt(0);
}
public static int findSeparator(byte[] utf, int start, int length, byte sep) {
for (int i = start; i < (start + length); i++) {
if (utf[i] == sep) {
return i;
}
}
return -1;
}
/** Read key/value pair in a line. */
public synchronized boolean next(LongWritable key, Text value)
throws IOException {
LongWritable tKey = key;
Text tValue = value;
byte[] line = null;
int lineLen = -1;
if (lineRecordReader.next(dummyKey, innerValue)) {
line = innerValue.getBytes();
lineLen = innerValue.getLength();
} else {
return false;
}
if (line == null)
return false;
int pos = findSeparator(line, 0, lineLen, this.separator);
if (pos == -1) {
tKey.set(Long.valueOf(new String(line, 0, lineLen)));
tValue.set("");
} else {
int keyLen = pos;
byte[] keyBytes = new byte[keyLen];
System.arraycopy(line, 0, keyBytes, 0, keyLen);
int valLen = lineLen - keyLen - 1;
byte[] valBytes = new byte[valLen];
System.arraycopy(line, pos + 1, valBytes, 0, valLen);
tKey.set(Long.valueOf(new String(keyBytes)));
tValue.set(valBytes);
}
return true;
}
public float getProgress() {
return lineRecordReader.getProgress();
}
public synchronized long getPos() throws IOException {
return lineRecordReader.getPos();
}
public synchronized void close() throws IOException {
lineRecordReader.close();
}
}
/**********************************
*KeyValueLongInputFormat.java
*Custom key value format
**********************************/
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
public class KeyValueLongInputFormat extends
FileInputFormat<LongWritable, Text> implements JobConfigurable {
private CompressionCodecFactory compressionCodecs = null;
@Override
public void configure(JobConf conf) {
compressionCodecs = new CompressionCodecFactory(conf);
}
protected boolean isSplitable(FileSystem fs, Path file) {
return compressionCodecs.getCodec(file) == null;
}
@Override
public RecordReader<LongWritable, Text> getRecordReader(
InputSplit genericSplit, JobConf job, Reporter reporter)
throws IOException {
reporter.setStatus(genericSplit.toString());
return new KeyValueLongLineRecordReader(job, (FileSplit) genericSplit);
}
}
/**********************************
*MapperMapSideJoinLargeDatasets.java
*Mapper
**********************************/
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.join.TupleWritable;
public class MapperMapSideJoinLargeDatasets extends MapReduceBase implements
Mapper<LongWritable, TupleWritable, Text, Text> {
Text txtKey = new Text("");
Text txtValue = new Text("");
@Override
public void map(LongWritable key, TupleWritable value,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
if (value.toString().length() > 0) {
txtKey.set(key.toString());
String arrEmpAttributes[] = value.get(0).toString().split(",");
String arrDeptAttributes[] = value.get(1).toString().split(",");
txtValue.set(arrEmpAttributes[1].toString() + "\t"
+ arrEmpAttributes[2].toString() + "\t"
+ arrDeptAttributes[0].toString());
output.collect(txtKey, txtValue);
}
}
}
/**********************************
*DriverMapSideJoinLargeDatasets
*Driver
**********************************/
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.join.CompositeInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class DriverMapSideJoinLargeDatasets {
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf("DriverMapSideJoinLargeDatasets");
conf.setJarByClass(DriverMapSideJoinLargeDatasets.class);
String[] jobArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
Path dirEmployeesData = new Path(jobArgs[0]);
Path dirSalaryData = new Path(jobArgs[1]);
Path dirOutput = new Path(jobArgs[2]);
conf.setMapperClass(MapperMapSideJoinLargeDatasets.class);
conf.setInputFormat(CompositeInputFormat.class);
String strJoinStmt = CompositeInputFormat.compose("inner",
KeyValueLongInputFormat.class, dirEmployeesData, dirSalaryData);
conf.set("mapred.join.expr", strJoinStmt);
conf.setNumReduceTasks(0);
conf.setOutputFormat(TextOutputFormat.class);
TextOutputFormat.setOutputPath(conf, dirOutput);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
RunningJob job = JobClient.runJob(conf);
while (!job.isComplete()) {
Thread.sleep(1000);
}
System.exit(job.isSuccessful() ? 0 : 2);
}
}
**************************
HDFS data load commands
**************************
hadoop fs -mkdir joinProject
hadoop fs -put joinProject/* joinProject/
**************************
Command to run program
**************************
hadoop jar ~/Blog/joinProject/MapSideJoinLargeDatasets/jar/MapSideJoinLgDsOAPI.jar DriverMapSideJoinLargeDatasets /user/akhanolk/joinProject/data/employees_sorted/part-e /user/akhanolk/joinProject/data/salaries_sorted/part-s /user/akhanolk/joinProject/output/output-MapSideJoinLargeDatasets
**************************
Results
**************************
...
13/09/22 13:11:17 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
13/09/22 13:11:17 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
13/09/22 13:11:17 INFO mapred.JobClient: Map-Reduce Framework
13/09/22 13:11:17 INFO mapred.JobClient: Map input records=224683000
13/09/22 13:11:17 INFO mapred.JobClient: Map output records=224683000
...
$ hadoop fs -cat joinProject/output/output-MapSideJoinLargeDatasets/part* | less
10001 Georgi Facello 88958
10002 Bezalel Simmel 72527
10003 Parto Bamford 43311
10004 Chirstian Koblick 74057
10005 Kyoichi Maliniak 94692
10006 Anneke Preusig 59755
10009 Sumant Peac 94409
10010 Duangkaew Piveteau 80324
.....
**************************
References
**************************
Concepts:
Pro Hadoop
Hadoop the Definitive Guide
Data-Intensive Text Processing with MapReduce
Code:
http://stackoverflow.com/questions/13415359/hadoop-compositeinputformat-not-joining-all-data
Data:
The data in this solution is from mysql - http://dev.mysql.com/doc/employee/en.index.html
*******************************
Pig - map-side join
of datasets with cardinality
of 1..1
Using 'replicated'
or
'merge', if sorted
*********************************
rawEmpDS = load '/user/akhanolk/joinProject/data/employees_active/part-e' using PigStorage(',') as (empNo:chararray,dOB:chararray,lName:chararray,fName:chararray,gender:chararray,hireDate:chararray,deptNo:chararray);
empDS = foreach rawEmpDS generate empNo,fName,lName,gender,deptNo;
rawSalDS = load '/user/akhanolk/joinProject/data/salaries_active/part-sc' using PigStorage(',') as (empNo:chararray,salary:long,fromDate:chararray,toDate:chararray);
salDS = foreach rawSalDS generate empNo, salary;
joinedDS = join empDS by empNo, salDS by empNo using 'replicated';
finalDS = foreach joinedDS generate empDS::empNo,empDS::fName,empDS::lName,empDS::gender,empDS::deptNo,salDS::salary;
store finalDS into '/user/akhanolk/joinProject/output/pig-RSJ1To1';
@bigdatasunil
Copy link

Hi,

Thanks for sharing. Its very useful. Can we do the same with new API ? if yes please suggest me how can we do.

@sky88088
Copy link

Hi,
Thanks for sharing. I have a small question.
In the KeyValueLongInputFormat, if the files are not compressed, it's splitable. Is it inappropriate in this case?
As I known, the CompositeInputFormat requires the same number of inputsplits for two dataset. If it is splitable, I think the number of two dataset are not the same or the key range is not match.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment