Skip to content

Instantly share code, notes, and snippets.

@airawat
Last active August 22, 2018 15:18
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save airawat/6639753 to your computer and use it in GitHub Desktop.
Save airawat/6639753 to your computer and use it in GitHub Desktop.
NLineInputFormat - About NLineInputFormat, uses, and a sample program
**********************
Gist
**********************
A common interview question for a Hadoop developer position is whether we can control the number of
mappers for a job. We can - there are a few ways of controlling the number of mappers, as needed.
Using NLineInputFormat is one way.
About NLineInputFormat
----------------------
With this functionality, you can specify exactly how many lines should go to a mapper.
E.g. If your file has 500 lines, and you set number of lines per mapper to 10, you have 50 mappers
(instead of one - assuming the file is smaller than a HDFS block size).
When would you use NLineInputFormat?
------------------------------------
Some examples from Hadoop the definitive guide-
1. In applications that take a small amount of input data and run an extensive (that is, CPU-intensive)
computation for it, then emit their output.
2. Another example...you create a “seed” input file that lists the data sources, one per line. Then
each mapper is allocated a single data source, and it loads the data from that source into HDFS.
Sample program
---------------
The sample program below demonstrates the functionality.
The mapper merely emits the input key-value pairs.
The input is a file with ~224,000 records.
The output is files containing 10,000 records each (so a total of 23 files).
*******************************
*Data and code download
*******************************
Data and code:
--------------
gitHub:
<<To be added>>
Email me at airawat.blog@gmail.com if you encounter any issues
Directory structure
-------------------
formatProject
data
employees_tsv
employees_tsv
formatNLineInputFormat
src
NLineInputFormat.java //Original Apache source code
MapperNLineInputFormat.java //Mapper
DriverNLineInputFormat.java //Driver
jar
formatNLineInputFormat.jar
*******************************
*Sample Data
*******************************
EmpID DOB FName LName Gender Hire date DeptID
10003 1959-12-03 Parto Bamford M 1986-08-28 d004
10004 1954-05-01 Chirstian Koblick M 1986-12-01 d004
10005 1955-01-21 Kyoichi Maliniak M 1989-09-12 d003
.....
/*******************************************************************
* Mapper
* MapperNLineInputFormat.java
*******************************************************************/
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MapperNLineInputFormat extends
Mapper<LongWritable, Text, LongWritable, Text> {
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
context.write(key, value);
}
}
/*******************************************************************
* Driver
* DriverNLineInputFormat.java
*******************************************************************/
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class DriverNLineInputFormat extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.out
.printf("Two parameters are required for DriverNLineInputFormat- <input dir> <output dir>\n");
return -1;
}
Job job = new Job(getConf());
job.setJobName("NLineInputFormat example");
job.setJarByClass(DriverNLineInputFormat.class);
job.setInputFormatClass(NLineInputFormat.class);
NLineInputFormat.addInputPath(job, new Path(args[0]));
job.getConfiguration().setInt(
"mapreduce.input.lineinputformat.linespermap", 10000);
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MapperNLineInputFormat.class);
job.setNumReduceTasks(0);
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(),
new DriverNLineInputFormat(), args);
System.exit(exitCode);
}
}
***********************************************
** Commands to load data
***********************************************
$ hadoop fs -mkdir formatProject
$ hadoop fs -put formatProject/data formatProject/
***********************************************
** Commands to run the program
***********************************************
hadoop jar ~/Blog/formatProject/formatNLineInputFormat/jar/formatNLineInputFormat.jar DriverNLineInputFormat /user/akhanolk/sortProject/data/employees/employees_tsv /user/akhanolk/formatProject/data/output-formatNLineInputFormat
***********************************************
** Results
***********************************************
$ for filename in `hadoop fs -ls -R formatProject/data/output-formatNLineInputFormat/part* | awk '{print $8}'`
do
echo "Filename: " $filename " [Record count:" `hadoop fs -cat $filename | wc -l` "]"
done
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00000 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00001 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00002 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00003 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00004 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00005 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00006 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00007 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00008 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00009 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00010 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00011 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00012 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00013 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00014 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00015 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00016 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00017 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00018 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00019 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00020 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00021 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00022 [Record count: 4683 ]
$ hadoop fs -cat formatProject/data/output-formatNLineInputFormat/part-* | wc -l
224683
$ hadoop fs -cat formatProject/data/output-formatNLineInputFormat/part-m-00022
...
11474355 499977 1956-06-05 Martial Weisert F 1996-09-17 d002
11474407 499979 1962-10-29 Prasadram Waleschkowski M 1994-01-04 d005
11474467 499980 1959-06-28 Gino Usery M 1991-02-11 d007
..
/******************************************************
* NLineInputFormat.java
* Had to add this to the project, as the version of
* Hadoop I have does not include the NLineInputFormat
* functionality as part of the new API
*****************************************************/
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
//import org.apache.hadoop.classification.InterfaceAudience;
//import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
import org.apache.hadoop.util.LineReader;
/**
* NLineInputFormat which splits N lines of input as one split.
*
* In many "pleasantly" parallel applications, each process/mapper processes the
* same input file (s), but with computations are controlled by different
* parameters.(Referred to as "parameter sweeps"). One way to achieve this, is
* to specify a set of parameters (one set per line) as input in a control file
* (which is the input path to the map-reduce application, where as the input
* dataset is specified via a config variable in JobConf.).
*
* The NLineInputFormat can be used in such applications, that splits the input
* file such that by default, one line is fed as a value to one map task, and
* key is the offset. i.e. (k,v) is (LongWritable, Text). The location hints
* will span the whole mapred cluster.
*/
// @InterfaceAudience.Public
// @InterfaceStability.Stable
public class NLineInputFormat extends FileInputFormat<LongWritable, Text> {
public static final String LINES_PER_MAP = "mapreduce.input.lineinputformat.linespermap";
public RecordReader<LongWritable, Text> createRecordReader(
InputSplit genericSplit, TaskAttemptContext context)
throws IOException {
context.setStatus(genericSplit.toString());
return new LineRecordReader();
}
/**
* Logically splits the set of input files for the job, splits N lines of
* the input as one split.
*
* @see FileInputFormat#getSplits(JobContext)
*/
public List<InputSplit> getSplits(JobContext job) throws IOException {
List<InputSplit> splits = new ArrayList<InputSplit>();
int numLinesPerSplit = getNumLinesPerSplit(job);
for (FileStatus status : listStatus(job)) {
splits.addAll(getSplitsForFile(status, job.getConfiguration(),
numLinesPerSplit));
}
return splits;
}
public static List<FileSplit> getSplitsForFile(FileStatus status,
Configuration conf, int numLinesPerSplit) throws IOException {
List<FileSplit> splits = new ArrayList<FileSplit>();
Path fileName = status.getPath();
if (status.isDir()) {
throw new IOException("Not a file: " + fileName);
}
FileSystem fs = fileName.getFileSystem(conf);
LineReader lr = null;
try {
FSDataInputStream in = fs.open(fileName);
lr = new LineReader(in, conf);
Text line = new Text();
int numLines = 0;
long begin = 0;
long length = 0;
int num = -1;
while ((num = lr.readLine(line)) > 0) {
numLines++;
length += num;
if (numLines == numLinesPerSplit) {
// NLineInputFormat uses LineRecordReader, which always
// reads
// (and consumes) at least one character out of its upper
// split
// boundary. So to make sure that each mapper gets N lines,
// we
// move back the upper split limits of each split
// by one character here.
if (begin == 0) {
splits.add(new FileSplit(fileName, begin, length - 1,
new String[] {}));
} else {
splits.add(new FileSplit(fileName, begin - 1, length,
new String[] {}));
}
begin += length;
length = 0;
numLines = 0;
}
}
if (numLines != 0) {
splits.add(new FileSplit(fileName, begin, length,
new String[] {}));
}
} finally {
if (lr != null) {
lr.close();
}
}
return splits;
}
/**
* Set the number of lines per split
*
* @param job
* the job to modify
* @param numLines
* the number of lines per split
*/
public static void setNumLinesPerSplit(Job job, int numLines) {
job.getConfiguration().setInt(LINES_PER_MAP, numLines);
}
/**
* Get the number of lines per split
*
* @param job
* the job
* @return the number of lines per split
*/
public static int getNumLinesPerSplit(JobContext job) {
return job.getConfiguration().getInt(LINES_PER_MAP, 1);
}
}
@airawat
Copy link
Author

airawat commented Sep 20, 2013

My notes - please disregard

scp /Users/akhanolkar/Documents/hadoop-jars/formatNLineInputFormat.jar akhanolk@cdh-dev01:~/Blog/formatProject/formatNLineInputFormat/jar/

hadoop fs -rm -R formatProject/data/output-formatNLineInputFormat

hadoop jar ~/Blog/formatProject/formatNLineInputFormat/jar/formatNLineInputFormat.jar DriverNLineInputFormat /user/akhanolk/sortProject/data/employees/employees_tsv /user/akhanolk/formatProject/data/output-formatNLineInputFormat

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment