NLineInputFormat - About NLineInputFormat, uses, and a sample program
A common interview question for a Hadoop developer position is whether we can control the number of
mappers for a job. We can - there are a few ways of controlling the number of mappers, as needed.
Using NLineInputFormat is one way.
About NLineInputFormat
With this functionality, you can specify exactly how many lines should go to a mapper.
E.g. If your file has 500 lines, and you set number of lines per mapper to 10, you have 50 mappers
(instead of one - assuming the file is smaller than a HDFS block size).
When would you use NLineInputFormat?
Some examples from Hadoop the definitive guide-
1. In applications that take a small amount of input data and run an extensive (that is, CPU-intensive)
computation for it, then emit their output.
2. Another create a “seed” input file that lists the data sources, one per line. Then
each mapper is allocated a single data source, and it loads the data from that source into HDFS.
Sample program
The sample program below demonstrates the functionality.
The mapper merely emits the input key-value pairs.
The input is a file with ~224,000 records.
The output is files containing 10,000 records each (so a total of 23 files).
Data and code download
Data and code:
<<To be added>>
Email me at if you encounter any issues
Directory structure
src //Original Apache source code //Mapper //Driver
*Sample Data
EmpID DOB FName LName Gender Hire date DeptID
10003 1959-12-03 Parto Bamford M 1986-08-28 d004
10004 1954-05-01 Chirstian Koblick M 1986-12-01 d004
10005 1955-01-21 Kyoichi Maliniak M 1989-09-12 d003
* Mapper
import org.apache.hadoop.mapreduce.Mapper;
public class MapperNLineInputFormat extends
Mapper<LongWritable, Text, LongWritable, Text> {
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
context.write(key, value);
* Driver
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class DriverNLineInputFormat extends Configured implements Tool {
public int run(String[] args) throws Exception {
if (args.length != 2) {
.printf("Two parameters are required for DriverNLineInputFormat- <input dir> <output dir>\n");
return -1;
Job job = new Job(getConf());
job.setJobName("NLineInputFormat example");
NLineInputFormat.addInputPath(job, new Path(args[0]));
"mapreduce.input.lineinputformat.linespermap", 10000);
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
public static void main(String[] args) throws Exception {
int exitCode = Configuration(),
new DriverNLineInputFormat(), args);
** Commands to load data
$ hadoop fs -mkdir formatProject
$ hadoop fs -put formatProject/data formatProject/
** Commands to run the program
hadoop jar ~/Blog/formatProject/formatNLineInputFormat/jar/formatNLineInputFormat.jar DriverNLineInputFormat /user/akhanolk/sortProject/data/employees/employees_tsv /user/akhanolk/formatProject/data/output-formatNLineInputFormat
** Results
$ for filename in `hadoop fs -ls -R formatProject/data/output-formatNLineInputFormat/part* | awk '{print $8}'`
echo "Filename: " $filename " [Record count:" `hadoop fs -cat $filename | wc -l` "]"
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00000 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00001 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00002 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00003 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00004 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00005 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00006 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00007 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00008 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00009 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00010 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00011 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00012 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00013 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00014 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00015 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00016 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00017 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00018 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00019 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00020 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00021 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00022 [Record count: 4683 ]
$ hadoop fs -cat formatProject/data/output-formatNLineInputFormat/part-* | wc -l
$ hadoop fs -cat formatProject/data/output-formatNLineInputFormat/part-m-00022
11474355 499977 1956-06-05 Martial Weisert F 1996-09-17 d002
11474407 499979 1962-10-29 Prasadram Waleschkowski M 1994-01-04 d005
11474467 499980 1959-06-28 Gino Usery M 1991-02-11 d007
* Had to add this to the project, as the version of
* Hadoop I have does not include the NLineInputFormat
* functionality as part of the new API
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
import java.util.ArrayList;
import java.util.List;
//import org.apache.hadoop.classification.InterfaceAudience;
//import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
import org.apache.hadoop.util.LineReader;
* NLineInputFormat which splits N lines of input as one split.
* In many "pleasantly" parallel applications, each process/mapper processes the
* same input file (s), but with computations are controlled by different
* parameters.(Referred to as "parameter sweeps"). One way to achieve this, is
* to specify a set of parameters (one set per line) as input in a control file
* (which is the input path to the map-reduce application, where as the input
* dataset is specified via a config variable in JobConf.).
* The NLineInputFormat can be used in such applications, that splits the input
* file such that by default, one line is fed as a value to one map task, and
* key is the offset. i.e. (k,v) is (LongWritable, Text). The location hints
* will span the whole mapred cluster.
// @InterfaceAudience.Public
// @InterfaceStability.Stable
public class NLineInputFormat extends FileInputFormat<LongWritable, Text> {
public static final String LINES_PER_MAP = "mapreduce.input.lineinputformat.linespermap";
public RecordReader<LongWritable, Text> createRecordReader(
InputSplit genericSplit, TaskAttemptContext context)
throws IOException {
return new LineRecordReader();
* Logically splits the set of input files for the job, splits N lines of
* the input as one split.
* @see FileInputFormat#getSplits(JobContext)
public List<InputSplit> getSplits(JobContext job) throws IOException {
List<InputSplit> splits = new ArrayList<InputSplit>();
int numLinesPerSplit = getNumLinesPerSplit(job);
for (FileStatus status : listStatus(job)) {
splits.addAll(getSplitsForFile(status, job.getConfiguration(),
return splits;
public static List<FileSplit> getSplitsForFile(FileStatus status,
Configuration conf, int numLinesPerSplit) throws IOException {
List<FileSplit> splits = new ArrayList<FileSplit>();
Path fileName = status.getPath();
if (status.isDir()) {
throw new IOException("Not a file: " + fileName);
FileSystem fs = fileName.getFileSystem(conf);
LineReader lr = null;
try {
FSDataInputStream in =;
lr = new LineReader(in, conf);
Text line = new Text();
int numLines = 0;
long begin = 0;
long length = 0;
int num = -1;
while ((num = lr.readLine(line)) > 0) {
length += num;
if (numLines == numLinesPerSplit) {
// NLineInputFormat uses LineRecordReader, which always
// reads
// (and consumes) at least one character out of its upper
// split
// boundary. So to make sure that each mapper gets N lines,
// we
// move back the upper split limits of each split
// by one character here.
if (begin == 0) {
splits.add(new FileSplit(fileName, begin, length - 1,
new String[] {}));
} else {
splits.add(new FileSplit(fileName, begin - 1, length,
new String[] {}));
begin += length;
length = 0;
numLines = 0;
if (numLines != 0) {
splits.add(new FileSplit(fileName, begin, length,
new String[] {}));
} finally {
if (lr != null) {
return splits;
* Set the number of lines per split
* @param job
* the job to modify
* @param numLines
* the number of lines per split
public static void setNumLinesPerSplit(Job job, int numLines) {
job.getConfiguration().setInt(LINES_PER_MAP, numLines);
* Get the number of lines per split
* @param job
* the job
* @return the number of lines per split
public static int getNumLinesPerSplit(JobContext job) {
return job.getConfiguration().getInt(LINES_PER_MAP, 1);
