Skip to content

Instantly share code, notes, and snippets.

@airawat
Last active September 25, 2016 14:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save airawat/6600892 to your computer and use it in GitHub Desktop.
Save airawat/6600892 to your computer and use it in GitHub Desktop.
Map-side join example - Java code for joining two datasets - one large (tsv format), and one with reference data (txt file), made available through DistributedCache via command line (GenericOptionsParser)
This gist is part of a series of gists related to Map-side joins in Java map-reduce.
In the gist - https://gist.github.com/airawat/6597557, we added the reference data available
in HDFS to the distributed cache from the driver code.
This gist demonstrates adding a local file via command line to distributed cache.
Refer gist at https://gist.github.com/airawat/6597557 for-
1. Data samples and structure
2. Expected results
3. Commands to load data to HDFS
The listing below includes:
4. Data and code download location
5. Mapper code
6. Driver code
7. Command to run the program
8. Results
04. Data and script download
-----------------------------
Google:
<<To be added>>
Email me at airawat.blog@gmail.com if you encounter any issues
gitHub:
<<To be added>>
Directory structure
-------------------
joinProject
data
employees_tsv
employees_tsv
departments_sorted
departments_txt
MapSideJoin-DistCacheTxtFileGOP
src
MapperMapSideJoinDCacheTextFileGOP.java
DriverMapSideJoinDCacheTxtFileGOP.java
jar
MapSideJoin-DistCacheTxtFileGOP.jar
/********************************************
*Mapper
*MapperMapSideJoinDCacheTextFileGOP
********************************************/
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MapperMapSideJoinDCacheTextFileGOP extends
Mapper<LongWritable, Text, Text, Text> {
private static HashMap<String, String> DepartmentMap = new HashMap<String, String>();
private BufferedReader brReader;
private String strDeptName = "";
private Text txtMapOutputKey = new Text("");
private Text txtMapOutputValue = new Text("");
enum MYCOUNTER {
RECORD_COUNT, FILE_EXISTS, FILE_NOT_FOUND, SOME_OTHER_ERROR
}
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
File lookupFile = new File("departments_txt");
String strLineRead = "";
try {
brReader = new BufferedReader(new FileReader(lookupFile));
// Read each line, split and load to HashMap
while ((strLineRead = brReader.readLine()) != null) {
String deptFieldArray[] = strLineRead.split("\\t");
DepartmentMap.put(deptFieldArray[0].trim(),
deptFieldArray[1].trim());
}
} catch (FileNotFoundException e) {
e.printStackTrace();
context.getCounter(MYCOUNTER.FILE_NOT_FOUND).increment(1);
} catch (IOException e) {
context.getCounter(MYCOUNTER.SOME_OTHER_ERROR).increment(1);
e.printStackTrace();
}
}
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
context.getCounter(MYCOUNTER.RECORD_COUNT).increment(1);
if (value.toString().length() > 0) {
String arrEmpAttributes[] = value.toString().split("\\t");
try {
strDeptName = DepartmentMap.get(arrEmpAttributes[6].toString());
} finally {
strDeptName = ((strDeptName.equals(null) || strDeptName
.equals("")) ? "NOT-FOUND" : strDeptName);
}
txtMapOutputKey.set(arrEmpAttributes[0].toString());
txtMapOutputValue.set(arrEmpAttributes[1].toString() + "\t"
+ arrEmpAttributes[1].toString() + "\t"
+ arrEmpAttributes[2].toString() + "\t"
+ arrEmpAttributes[3].toString() + "\t"
+ arrEmpAttributes[4].toString() + "\t"
+ arrEmpAttributes[5].toString() + "\t"
+ arrEmpAttributes[6].toString() + "\t" + strDeptName);
}
context.write(txtMapOutputKey, txtMapOutputValue);
strDeptName = "";
}
}
/********************************************
*Driver
*DriverMapSideJoinDCacheTxtFileGOP
********************************************/
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class DriverMapSideJoinDCacheTxtFileGOP extends Configured implements
Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.out
.printf("Two parameters are required for DriverMapSideJoinDCacheTxtFileGOP- <input dir> <output dir>\n");
return -1;
}
Job job = new Job(getConf());
job.setJobName("Map-side join with text lookup file in DCache-GenericOptionsParser");
job.setJarByClass(DriverMapSideJoinDCacheTxtFileGOP.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MapperMapSideJoinDCacheTextFileGOP.class);
job.setNumReduceTasks(0);
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(),
new DriverMapSideJoinDCacheTxtFileGOP(), args);
System.exit(exitCode);
}
}
********************************************
*Job run commands
********************************************
hadoop jar ~/Blog/joinProject/MapSideJoin-DistCacheTxtFileGOP/jar/MapSideJoin-DistCacheTxtFileGOP.jar DriverMapSideJoinDCacheTxtFileGOP -files /home/akhanolk/Blog/joinProject/data/departments_sorted/departments_txt /user/akhanolk/joinProject/data/employees_tsv /user/akhanolk/joinProject/data/output-MapSideTxtFileLookUpDistCacheGOP
********************************************
*Program Output
********************************************
See - https://gist.github.com/airawat/6597557
@airawat
Copy link
Author

airawat commented Sep 18, 2013

My notes - please disregard

scp /Users/akhanolkar/Documents/hadoop-jars/MapSideJoin-DistCacheTxtFileGOP.jar akhanolk@cdh-dev01:~/Blog/joinProject/MapSideJoin-DistCacheTxtFileGOP/jar/

hadoop fs -rm -R joinProject/data/output-MapSideTxtFileLookUpDistCacheGOP

hadoop jar ~/Blog/joinProject/MapSideJoin-DistCacheTxtFileGOP/jar/MapSideJoin-DistCacheTxtFileGOP.jar DriverMapSideJoinDCacheTxtFileGOP -files /home/akhanolk/Blog/joinProject/data/departments_sorted/departments_txt /user/akhanolk/joinProject/data/employees_tsv /user/akhanolk/joinProject/data/output-MapSideTxtFileLookUpDistCacheGOP

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment