Skip to content

Instantly share code, notes, and snippets.

@airawat
Last active December 23, 2015 06:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 4 You must be signed in to fork a gist
  • Save airawat/6597557 to your computer and use it in GitHub Desktop.
Save airawat/6597557 to your computer and use it in GitHub Desktop.
Map-side join example - Java code for joining two datasets - one large (tsv format), and one with lookup data (mapfile), made available through DistributedCache
This gist demonstrates how to do a map-side join, joining a MapFile from distributedcache
with a larger dataset in HDFS.
Includes:
---------
1. Input data and script download
2. Dataset structure review
3. Expected results
4. Mapper code
5. Driver code
6. Data load commands
7. Command to run Java program
8. Results of the program
01. Data and script download
-----------------------------
Google:
<<To be added>>
Email me at airawat.blog@gmail.com if you encounter any issues
gitHub:
<<To be added>>
Directory structure
-------------------
joinProject
data
employees_tsv
employees_tsv
departments_map.tar.gz
MapSideJoin-DistCacheMapFile
src
MapperMapSideJoinDCacheMapFile.java
DriverMapSideJoinDCacheMapFile
jar
MapSideJoinDCacheMapFile.jar
********************************************
*Data structure
********************************************
a) Small dataset (departments_map)
[DeptNo DeptName] - MapFile
d001 Marketing
d002 Finance
d003 Human Resources
d004 Production
d005 Development
d006 Quality Management
d007 Sales
d008 Research
d009 Customer Service
b) Large dataset (employees_tsv)
[Emp_no DOB FName LName HireDate DeptNo] - Tab separated
10001 1953-09-02 Georgi Facello M 1986-06-26 d005
10002 1964-06-02 Bezalel Simmel F 1985-11-21 d007
10003 1959-12-03 Parto Bamford M 1986-08-28 d004
10004 1954-05-01 Chirstian Koblick M 1986-12-01 d004
10005 1955-01-21 Kyoichi Maliniak M 1989-09-12 d003
10006 1953-04-20 Anneke Preusig F 1989-06-02 d005
10009 1952-04-19 Sumant Peac F 1985-02-18 d006
...
********************************************
*Expected Results
********************************************
Everything in employees_tsv file followed by a tab and the department name(from the department file)
10001 1953-09-02 1953-09-02 Georgi Facello M 1986-06-26 d005 Development
10002 1964-06-02 1964-06-02 Bezalel Simmel F 1985-11-21 d007 Sales
10003 1959-12-03 1959-12-03 Parto Bamford M 1986-08-28 d004 Production
......
/********************************************
*Mapper
*MapperMapSideJoinDCacheMapFile
********************************************/
import java.io.File;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MapperMapSideJoinDCacheMapFile extends
Mapper<LongWritable, Text, Text, Text> {
private MapFile.Reader deptMapReader = null;
private Text txtMapOutputKey = new Text("");
private Text txtMapOutputValue = new Text("");
private Text txtMapLookupKey = new Text("");
private Text txtMapLookupValue = new Text("");
enum MYCOUNTER {
RECORD_COUNT, FILE_EXISTS, LOAD_MAP_ERROR
}
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
Path[] cacheFilesLocal = DistributedCache.getLocalCacheArchives(context
.getConfiguration());
for (Path eachPath : cacheFilesLocal) {
if (eachPath.getName().toString().trim()
.equals("departments_map.tar.gz")) {
URI uriUncompressedFile = new File(eachPath.toString()
+ "/departments_map").toURI();
context.getCounter(MYCOUNTER.FILE_EXISTS).increment(1);
loadDepartmentsMap(uriUncompressedFile, context);
}
}
}
@SuppressWarnings("deprecation")
private void loadDepartmentsMap(URI uriUncompressedFile, Context context)
throws IOException {
FileSystem dfs = FileSystem.get(context.getConfiguration());
try {
deptMapReader = new MapFile.Reader(dfs,
uriUncompressedFile.toString(), context.getConfiguration());
} catch (Exception e) {
// TODO Auto-generated catch block
context.getCounter(MYCOUNTER.LOAD_MAP_ERROR).increment(1);
e.printStackTrace();
}
}
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
context.getCounter(MYCOUNTER.RECORD_COUNT).increment(1);
if (value.toString().length() > 0) {
String arrEmpAttributes[] = value.toString().split("\\t");
txtMapLookupKey.set(arrEmpAttributes[6].toString());
try {
deptMapReader.get(txtMapLookupKey, txtMapLookupValue);
} finally {
txtMapLookupValue
.set((txtMapLookupValue.equals(null) || txtMapLookupValue
.equals("")) ? "NOT-FOUND" : txtMapLookupValue
.toString());
}
txtMapOutputKey.set(arrEmpAttributes[0].toString());
txtMapOutputValue.set(arrEmpAttributes[1].toString() + "\t"
+ arrEmpAttributes[1].toString() + "\t"
+ arrEmpAttributes[2].toString() + "\t"
+ arrEmpAttributes[3].toString() + "\t"
+ arrEmpAttributes[4].toString() + "\t"
+ arrEmpAttributes[5].toString() + "\t"
+ arrEmpAttributes[6].toString() + "\t"
+ txtMapLookupValue.toString());
}
context.write(txtMapOutputKey, txtMapOutputValue);
txtMapLookupValue.set("");
txtMapLookupKey.set("");
}
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
deptMapReader.close();
}
}
/********************************************
*Driver
*DriverMapSideJoinDCacheMapFile
********************************************/
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class DriverMapSideJoinDCacheMapFile extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.out
.printf("Two parameters are required for DriverMapSideJoinDCacheMapFile- <input dir> <output dir>\n");
return -1;
}
Job job = new Job(getConf());
Configuration conf = job.getConfiguration();
job.setJobName("Map-side join with mapfile in DCache");
DistributedCache
.addCacheArchive(
new URI(
"/user/akhanolk/joinProject/data/departments_map.tar.gz"),
conf);
job.setJarByClass(DriverMapSideJoinDCacheMapFile.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MapperMapSideJoinDCacheMapFile.class);
job.setNumReduceTasks(0);
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(),
new DriverMapSideJoinDCacheMapFile(), args);
System.exit(exitCode);
}
}
********************************************
*HDFS load commands
********************************************
hadoop fs -mkdir joinProject
hadoop fs -mkdir joinProject/data
hadoop fs -put joinProject/data/* joinProject/data/
********************************************
*Job run commands
********************************************
hadoop jar ~/Blog/joinProject/MapSideJoin-DistCacheMapFile/jar/MapSideJoinDCacheMapFile.jar DriverMapSideJoinDCacheMapFile /user/akhanolk/joinProject/data/employees_tsv /user/akhanolk/joinProject/data/output-MapSideMapFileLookUpDistCache
********************************************
*Program Output
********************************************
hadoop fs -cat /user/akhanolk/joinProject/data/output-MapSideMapFileLookUpDistCache/part* | less
10001 1953-09-02 1953-09-02 Georgi Facello M 1986-06-26 d005 Development
10002 1964-06-02 1964-06-02 Bezalel Simmel F 1985-11-21 d007 Sales
10003 1959-12-03 1959-12-03 Parto Bamford M 1986-08-28 d004 Production
10004 1954-05-01 1954-05-01 Chirstian Koblick M 1986-12-01 d004 Production
10005 1955-01-21 1955-01-21 Kyoichi Maliniak M 1989-09-12 d003 Human Resources
10006 1953-04-20 1953-04-20 Anneke Preusig F 1989-06-02 d005 Development
10009 1952-04-19 1952-04-19 Sumant Peac F 1985-02-18 d006 Quality Management
..
@airawat
Copy link
Author

airawat commented Sep 18, 2013

My notes - please disregard

scp /Users/akhanolkar/Documents/hadoop-jars/MapSideJoinDCacheMapFile.jar akhanolk@cdh-dev01:~/Blog/joinProject/MapSideJoin-DistCacheMapFile/jar/

hadoop fs -rm -R joinProject/data/output-MapSideFileLookUpDistCache

hadoop jar ~/Blog/joinProject/MapSideJoin-DistCacheMapFile/jar/MapSideJoinDCacheMapFile.jar DriverMapSideJoinDCacheMapFile /user/akhanolk/joinProject/data/employees_tsv /user/akhanolk/joinProject/data/output-MapSideMapFileLookUpDistCache

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment