Skip to content

Instantly share code, notes, and snippets.

@qxj
Forked from airawat/00-MultipleOutputs
Created January 10, 2017 07:12
Show Gist options
  • Save qxj/9e022422546ecc2c798c75b89aa3e5d0 to your computer and use it in GitHub Desktop.
Save qxj/9e022422546ecc2c798c75b89aa3e5d0 to your computer and use it in GitHub Desktop.
MultipleOutputs sample program - A program that demonstrates how to generate an output file for each key
********************************
Gist
********************************
Motivation
-----------
The typical mapreduce job creates files with the prefix "part-"..and then the "m" or "r" depending
on whether it is a map or a reduce output, and then the part number. There are scenarios where we
may want to create separate files based on criteria-data keys and/or values. Enter the "MultipleOutputs"
functionality.
More about MultipleOutputs
---------------------------
Here's the write-up from Hadoop the definitive guide-
"MultipleOutputs allows you to write data to files whose names are derived from the output keys and
values, or in fact from an arbitrary string. This allows each reducer (or mapper in a map-only job)
to create more than a single file. Filenames are of the form name-m-nnnnn for map outputs and
name-r-nnnnn for reduce outputs, where name is an arbitrary name that is set by the program,
and nnnnn is an integer designating the part number, starting from zero. The part number
ensures that outputs written from different partitions (mappers or reducers) do not collide in the
case of the same name."
About LazyOutputFormat
-----------------------
A typical mapreduce program can produce output files that are empty, depending on your implemetation.
If you want to suppress creation of empty files, you need to leverage LazyOutputFormat.
Two lines in your driver will do the trick-
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
&
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
Sample program
---------------
This gist includes a sample program that demonstrates the MultipleOutputs functionality.
The input is a file with employee data, a key attribute being the department number.
The output expected is a file for each department, containing employees belonging to the same.
The program also suppresses creation of empty files.
*******************************
*Data and code download
*******************************
Data and code:
--------------
gitHub:
<<To be added>>
Email me at airawat.blog@gmail.com if you encounter any issues
Directory structure
-------------------
formatProject
data
employees_tsv
employees_tsv
formatMultipleOutputs
src
MapperFormatMultiOutput.java
ReducerFormatMultiOutput.java
DriverFormatMultiOutput.java
jar
formatMultiOutput.jar
*******************************
*Sample Data
*******************************
EmpID DOB FName LName Gender Hire date DeptID
10003 1959-12-03 Parto Bamford M 1986-08-28 d004
10004 1954-05-01 Chirstian Koblick M 1986-12-01 d004
10005 1955-01-21 Kyoichi Maliniak M 1989-09-12 d003
.....
*******************************
*Expected results
*******************************
One file for each department.
Within each file, the following employee attributes are required-
DeptNo LName FName EmpNo
***************************************
**Mapper - MapperFormatMultiOutput.java
***************************************
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MapperFormatMultiOutput extends
Mapper<LongWritable, Text, Text, Text> {
private Text txtKey = new Text("");
private Text txtValue = new Text("");
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
if (value.toString().length() > 0) {
String arrEmpAttributes[] = value.toString().split("\\t");
txtKey.set(arrEmpAttributes[6].toString());
txtValue.set(arrEmpAttributes[3].toString() + "\t"
+ arrEmpAttributes[2].toString() + "\t"
+ arrEmpAttributes[0].toString());
context.write(txtKey, txtValue);
}
}
}
*******************************************
**Reducer - ReducerFormatMultiOutput.java
*******************************************
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
public class ReducerFormatMultiOutput extends Reducer<Text, Text, Text, Text> {
private MultipleOutputs mos;
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
mos = new MultipleOutputs(context);
}
@Override
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text value : values) {
mos.write(key, value, key.toString());
}
}
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
mos.close();
}
}
*******************************************
**Driver - DriverFormatMultiOutput.java
*******************************************
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class DriverFormatMultiOutput extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.out
.printf("Two parameters are required for DriverFormatMultiOutput- <input dir> <output dir>\n");
return -1;
}
Job job = new Job(getConf());
job.setJobName("MultipleOutputs example");
job.setJarByClass(DriverFormatMultiOutput.class);
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MapperFormatMultiOutput.class);
job.setMapOutputKeyClass(Text.class);
job.setReducerClass(ReducerFormatMultiOutput.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(4);
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(),
new DriverFormatMultiOutput(), args);
System.exit(exitCode);
}
}
*******************************************
**Commands to load data
*******************************************
$ hadoop fs -mkdir formatProject
$ hadoop fs -put formatProject/data formatProject/
*******************************************
**Command to run program
*******************************************
hadoop jar ~/Blog/formatProject/formatMultiOutputFormat/jar/formatMultiOutput.jar DriverFormatMultiOutput /user/akhanolk/sortProject/data/employees/employees_tsv /user/akhanolk/formatProject/data/output-formatMultiOutput
********************************
**Results
********************************
$ hadoop fs -ls -R formatProject/data/output-formatMultiOutput/d00* | awk '{print $8, $5}'
formatProject/data/output-formatMultiOutput/d001-r-00002 401857
formatProject/data/output-formatMultiOutput/d002-r-00003 336632
formatProject/data/output-formatMultiOutput/d003-r-00000 348770
formatProject/data/output-formatMultiOutput/d004-r-00001 1442822
formatProject/data/output-formatMultiOutput/d005-r-00002 1662566
formatProject/data/output-formatMultiOutput/d006-r-00003 394272
formatProject/data/output-formatMultiOutput/d007-r-00000 1020167
formatProject/data/output-formatMultiOutput/d009-r-00002 475747
$ hadoop fs -cat formatProject/data/output-formatMultiOutput/d001-r-0000 | less
d001 Yetto Lucian 39682
d001 Cooke Padma 49634
d001 Marrevee Giap 49632
..
**********************
References:
**********************
- Hadoop the definitive guide, 3rd edition
- Apache documentation on MultipleOutputs
http://www.google.com/url?sa=t&rct=j&q=&esrc=s&source=web&cd=1&cad=rja&ved=0CCsQFjAA&url=http%3A%2F%2Fhadoop.apache.org%2Fdocs%2Fcurrent%2Fapi%2Forg%2Fapache%2Fhadoop%2Fmapred%2Flib%2FMultipleOutputs.html&ei=fV08Upq1KcifyQGbvIHoBg&usg=AFQjCNFST21nx1BLBEo4100dwm6bjt3CyA&bvm=bv.52434380,d.aWc
- Apache documentation on LazyOutputFormat
http://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapreduce/lib/output/LazyOutputFormat.html
**********************
Credits:
**********************
The data in this solution is from mysql - http://dev.mysql.com/doc/employee/en.index.html
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment