Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Map-side join example - Java code for joining two datasets - one large (tsv format), and one with lookup data (text), made available through DistributedCache
This gist demonstrates how to do a map-side join, loading one small dataset from DistributedCache into a HashMap
in memory, and joining with a larger dataset.
Includes:
---------
1. Input data and script download
2. Dataset structure review
3. Expected results
4. Mapper code
5. Driver code
6. Data load commands
7. Command to run Java program
8. Results of the program
01. Data and script download
-----------------------------
Google:
<<To be added>>
Email me at airawat.blog@gmail.com if you encounter any issues
gitHub:
<<To be added>>
Directory structure
-------------------
joinProject
data
employees_tsv
employees_tsv
departments_sorted
departments_txt
MapSideJoin-DistCacheTxtFile
src
MapperMapSideJoinDCacheTextFile.java
DriverMapSideJoinDCacheTxtFile
jar
MapSideJoinDCacheTextFile.jar
********************************************
*Data structure
********************************************
a) Small dataset (departments_txt)
[DeptNo DeptName] - Tab separated
d001 Marketing
d002 Finance
d003 Human Resources
d004 Production
d005 Development
d006 Quality Management
d007 Sales
d008 Research
d009 Customer Service
b) Large dataset (employees_tsv)
[Emp_no DOB FName LName HireDate DeptNo] - Tab separated
10001 1953-09-02 Georgi Facello M 1986-06-26 d005
10002 1964-06-02 Bezalel Simmel F 1985-11-21 d007
10003 1959-12-03 Parto Bamford M 1986-08-28 d004
10004 1954-05-01 Chirstian Koblick M 1986-12-01 d004
10005 1955-01-21 Kyoichi Maliniak M 1989-09-12 d003
10006 1953-04-20 Anneke Preusig F 1989-06-02 d005
10009 1952-04-19 Sumant Peac F 1985-02-18 d006
...
********************************************
*Expected Results
********************************************
Everything in employees_tsv file followed by a tab and the department name(from the department file)
10001 1953-09-02 1953-09-02 Georgi Facello M 1986-06-26 d005 Development
10002 1964-06-02 1964-06-02 Bezalel Simmel F 1985-11-21 d007 Sales
10003 1959-12-03 1959-12-03 Parto Bamford M 1986-08-28 d004 Production
......
/********************************************
*Mapper
*MapperMapSideJoinDCacheTextFile
********************************************/
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MapperMapSideJoinDCacheTextFile extends
Mapper<LongWritable, Text, Text, Text> {
private static HashMap<String, String> DepartmentMap = new HashMap<String, String>();
private BufferedReader brReader;
private String strDeptName = "";
private Text txtMapOutputKey = new Text("");
private Text txtMapOutputValue = new Text("");
enum MYCOUNTER {
RECORD_COUNT, FILE_EXISTS, FILE_NOT_FOUND, SOME_OTHER_ERROR
}
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
Path[] cacheFilesLocal = DistributedCache.getLocalCacheFiles(context
.getConfiguration());
for (Path eachPath : cacheFilesLocal) {
if (eachPath.getName().toString().trim().equals("departments_txt")) {
context.getCounter(MYCOUNTER.FILE_EXISTS).increment(1);
loadDepartmentsHashMap(eachPath, context);
}
}
}
private void loadDepartmentsHashMap(Path filePath, Context context)
throws IOException {
String strLineRead = "";
try {
brReader = new BufferedReader(new FileReader(filePath.toString()));
// Read each line, split and load to HashMap
while ((strLineRead = brReader.readLine()) != null) {
String deptFieldArray[] = strLineRead.split("\\t");
DepartmentMap.put(deptFieldArray[0].trim(),
deptFieldArray[1].trim());
}
} catch (FileNotFoundException e) {
e.printStackTrace();
context.getCounter(MYCOUNTER.FILE_NOT_FOUND).increment(1);
} catch (IOException e) {
context.getCounter(MYCOUNTER.SOME_OTHER_ERROR).increment(1);
e.printStackTrace();
}finally {
if (brReader != null) {
brReader.close();
}
}
}
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
context.getCounter(MYCOUNTER.RECORD_COUNT).increment(1);
if (value.toString().length() > 0) {
String arrEmpAttributes[] = value.toString().split("\\t");
try {
strDeptName = DepartmentMap.get(arrEmpAttributes[6].toString());
} finally {
strDeptName = ((strDeptName.equals(null) || strDeptName
.equals("")) ? "NOT-FOUND" : strDeptName);
}
txtMapOutputKey.set(arrEmpAttributes[0].toString());
txtMapOutputValue.set(arrEmpAttributes[1].toString() + "\t"
+ arrEmpAttributes[1].toString() + "\t"
+ arrEmpAttributes[2].toString() + "\t"
+ arrEmpAttributes[3].toString() + "\t"
+ arrEmpAttributes[4].toString() + "\t"
+ arrEmpAttributes[5].toString() + "\t"
+ arrEmpAttributes[6].toString() + "\t" + strDeptName);
}
context.write(txtMapOutputKey, txtMapOutputValue);
strDeptName = "";
}
}
/********************************************
*Driver
*DriverMapSideJoinDCacheTxtFile
********************************************/
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class DriverMapSideJoinDCacheTxtFile extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.out
.printf("Two parameters are required- <input dir> <output dir>\n");
return -1;
}
Job job = new Job(getConf());
Configuration conf = job.getConfiguration();
job.setJobName("Map-side join with text lookup file in DCache");
DistributedCache
.addCacheFile(
new URI(
"/user/akhanolk/joinProject/data/departments_sorted/departments_txt"),
conf);
job.setJarByClass(DriverMapSideJoinDCacheTxtFile.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MapperMapSideJoinDCacheTextFile.class);
job.setNumReduceTasks(0);
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(),
new DriverMapSideJoinDCacheTxtFile(), args);
System.exit(exitCode);
}
}
********************************************
*HDFS load commands
********************************************
hadoop fs -mkdir joinProject
hadoop fs -mkdir joinProject/data
hadoop fs -put joinProject/data/* joinProject/data/
********************************************
*Job run commands
********************************************
hadoop jar ~/Blog/joinProject/MapSideJoin-DistCacheTxtFile/jar/MapSideJoinDCacheTextFile.jar DriverMapSideJoinDCacheTxtFile /user/akhanolk/joinProject/data/employees_tsv /user/akhanolk/joinProject/data/output-MapSideTextFileLookUpDistCache
********************************************
*Program Output
********************************************
hadoop fs -cat joinProject/data/output-MapSideTextFileLookUpDistCache/part* | less
10001 1953-09-02 1953-09-02 Georgi Facello M 1986-06-26 d005 Development
10002 1964-06-02 1964-06-02 Bezalel Simmel F 1985-11-21 d007 Sales
10003 1959-12-03 1959-12-03 Parto Bamford M 1986-08-28 d004 Production
10004 1954-05-01 1954-05-01 Chirstian Koblick M 1986-12-01 d004 Production
10005 1955-01-21 1955-01-21 Kyoichi Maliniak M 1989-09-12 d003 Human Resources
10006 1953-04-20 1953-04-20 Anneke Preusig F 1989-06-02 d005 Development
10009 1952-04-19 1952-04-19 Sumant Peac F 1985-02-18 d006 Quality Management
10010 1963-06-01 1963-06-01 Duangkaew Piveteau F 1989-08-24 d006 Quality Management
10012 1960-10-04 1960-10-04 Patricio Bridgland M 1992-12-18 d005 Development
10013 1963-06-07 1963-06-07 Eberhardt Terkki M 1985-10-20 d003 Human Resources
.....
@airawat

This comment has been minimized.

Copy link
Owner Author

airawat commented Sep 18, 2013

My notes - please disregard

scp MapSideJoinDCacheTextFile.jar akhanolk@cdh-dev01:~/Blog/joinProject/MapSideJoin-DistCacheTxtFile/jar/

hadoop fs -rm -R joinProject/data/output-MapSideTextFileLookUpDistCache

hadoop jar ~/Blog/joinProject/MapSideJoin-DistCacheTxtFile/jar/MapSideJoinDCacheTextFile.jar DriverMapSideJoinDCacheTxtFile /user/akhanolk/joinProject/data/employees_tsv /user/akhanolk/joinProject/data/output-MapSideTextFileLookUpDistCache

@vashdev

This comment has been minimized.

Copy link

vashdev commented Sep 10, 2014

Hi , May be you tagged distrb file example(lookup functionality) as map side join.
Mapside join will need using CompositeInputFormat in essence. isnt it?

@barak066

This comment has been minimized.

Copy link

barak066 commented Oct 1, 2015

Hi, could you tell me when does the setup method being called ? before and after what ?

@pushkingupta2001

This comment has been minimized.

Copy link

pushkingupta2001 commented Oct 6, 2016

@barak066 - setup method runs before map function in the mapper is invoked.

@Piyushbalwani

This comment has been minimized.

Copy link

Piyushbalwani commented Dec 14, 2016

Hi, if departments_txt is also big enough, then what would be approach?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.