Skip to content

Instantly share code, notes, and snippets.

@airawat
Last active March 27, 2024 04:57
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 6 You must be signed in to fork a gist
  • Save airawat/6647007 to your computer and use it in GitHub Desktop.
Save airawat/6647007 to your computer and use it in GitHub Desktop.
CombineFileInputFormat - a solution to efficient map reduce processing of small files
*************************
Gist
*************************
One more gist related to controlling the number of mappers in a mapreduce task.
Background on Inputsplits
--------------------------
An inputsplit is a chunk of the input data allocated to a map task for processing. FileInputFormat
generates inputsplits (and divides the same into records) - one inputsplit for each file, unless the
file spans more than a HDFS block at which point it factors in the configured values of minimum split
size, maximimum split size and block size in determining the split size.
Here's the formula, from Hadoop the definitive guide-
Split size = max( minimumSplitSize, min( maximumSplitSize, HDFSBlockSize))
So, if we go with the default values, the split size = HDFSBlockSize for files spanning more than an
HDFS block.
Problem with mapreduce processing of small files
-------------------------------------------------
We all know that Hadoop works best with large files; But the reality is that we still have to deal
with small files. When you want to process many small files in a mapreduce job, by default, each file
is processed by a map task (So, 1000 small files = 1000 map tasks). Having too many tasks that
finish in a matter of seconds is inefficient.
Increasing the minimum split size, to reduce the number of map tasks, to handle such a situation, is
not the right solution as it will be at the potential cost of locality.
Solution
---------
CombineFileInputFormat packs many files into a split, providing more data for a map task to process.
It factors in node and rack locality so performance is not compromised.
Sample program
---------------
The sample program demonstrates that using CombineFileInput, we can process multiple small files (each file
with size less than HDFS block size), in a single map task.
Old API
--------
The new API in the version of Hadoop I am running does not include CombineFileInput.
Will write another gist with the program using new API, shortly.
Key aspects of the program
----------------------------
1. CombineFileInputFormat is an abstract class; We have to create a subclass that extends it, and
implement the getRecordReader method. This implementation is in the class -ExtendedCombineFileInputFormat.java
(courtesy - http://stackoverflow.com/questions/14270317/implementation-for-combinefileinputformat-hadoop-0-20-205)
2. In the driver, set the value of mapred.max.split.size
3. In the driver, set the input format to the subclass of CombineFileInputFormat
*******************************
*Data and code download
*******************************
Data and code:
--------------
gitHub:
<<To be added>>
Email me at airawat.blog@gmail.com if you encounter any issues
Directory structure
-------------------
formatProject
data
employees_partFiles
employees_part1
employees_part2
employees_part3
employees_part4
employees_part5
formatCombineFileInputFormat
src
MapperCombineFileInputFormat.java
DriverCombineFileInputFormat.java
ExtendedCombineFileInputFormat.java
jar
formatCombineFileInputFormatOAPI.jar
*******************************
Data Structure
*******************************
[EmpNo DOB FName LName HireDate DeptNo]
10001 1953-09-02 Georgi Facello M 1986-06-26 d005
10002 1964-06-02 Bezalel Simmel F 1985-11-21 d007
10003 1959-12-03 Parto Bamford M 1986-08-28 d004
.......
.......
*******************************
Expected Results
*******************************
Key goal of demonstration: Process 5 small files in one map task
Emit a subset of the input dataset.
[EmpNo FName LName]
10001 Georgi Facello
10002 Bezalel Simmel
10003 Parto Bamford
/********************************************
*File: MapperCombineFileInputFormat.java
*Usage: Mapper
********************************************/
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
public class MapperCombineFileInputFormat extends MapReduceBase implements
Mapper<LongWritable, Text, Text, Text> {
Text txtKey = new Text("");
Text txtValue = new Text("");
@Override
public void map(LongWritable key, Text value,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
if (value.toString().length() > 0) {
String[] arrEmpAttributes = value.toString().split("\\t");
txtKey.set(arrEmpAttributes[0].toString());
txtValue.set(arrEmpAttributes[2].toString() + "\t"
+ arrEmpAttributes[3].toString());
output.collect(txtKey, txtValue);
}
}
}
/********************************************
*File: DriverCombineFileInputFormat.java
*Usage: Driver
********************************************/
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class DriverCombineFileInputFormat {
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf("DriverCombineFileInputFormat");
conf.set("mapred.max.split.size", "134217728");//128 MB
conf.setJarByClass(DriverCombineFileInputFormat.class);
String[] jobArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
conf.setMapperClass(MapperCombineFileInputFormat.class);
conf.setInputFormat(ExtendedCombineFileInputFormat.class);
ExtendedCombineFileInputFormat.addInputPath(conf, new Path(jobArgs[0]));
conf.setNumReduceTasks(0);
conf.setOutputFormat(TextOutputFormat.class);
TextOutputFormat.setOutputPath(conf, new Path(jobArgs[1]));
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
RunningJob job = JobClient.runJob(conf);
while (!job.isComplete()) {
Thread.sleep(1000);
}
System.exit(job.isSuccessful() ? 0 : 2);
}
}
/********************************************
*File: ExtendedCombineFileInputFormat.java
*Usage: Sub-class implementation of abstract
class CombineFileInputFormat
********************************************/
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
import org.apache.hadoop.mapred.lib.CombineFileRecordReader;
import org.apache.hadoop.mapred.lib.CombineFileSplit;
@SuppressWarnings("deprecation")
public class ExtendedCombineFileInputFormat extends
CombineFileInputFormat<LongWritable, Text> {
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public RecordReader<LongWritable, Text> getRecordReader(InputSplit split,
JobConf conf, Reporter reporter) throws IOException {
return new CombineFileRecordReader(conf, (CombineFileSplit) split,
reporter, (Class) myCombineFileRecordReader.class);
}
public static class myCombineFileRecordReader implements
RecordReader<LongWritable, Text> {
private final LineRecordReader linerecord;
public myCombineFileRecordReader(CombineFileSplit split,
Configuration conf, Reporter reporter, Integer index)
throws IOException {
FileSplit filesplit = new FileSplit(split.getPath(index),
split.getOffset(index), split.getLength(index),
split.getLocations());
linerecord = new LineRecordReader(conf, filesplit);
}
@Override
public void close() throws IOException {
linerecord.close();
}
@Override
public LongWritable createKey() {
// TODO Auto-generated method stub
return linerecord.createKey();
}
@Override
public Text createValue() {
// TODO Auto-generated method stub
return linerecord.createValue();
}
@Override
public long getPos() throws IOException {
// TODO Auto-generated method stub
return linerecord.getPos();
}
@Override
public float getProgress() throws IOException {
// TODO Auto-generated method stub
return linerecord.getProgress();
}
@Override
public boolean next(LongWritable key, Text value) throws IOException {
// TODO Auto-generated method stub
return linerecord.next(key, value);
}
}
}
*****************************
*HDFS command to load data
*****************************
hadoop fs -mkdir formatProject
hadoop fs -put formatProject/data formatProject/
*****************************
*Run program
*****************************
hadoop jar ~/Blog/formatProject/formatCombineFileInputFormat/jar/formatCombineFileInputFormatOAPI.jar DriverCombineFileInputFormat /user/akhanolk/formatProject/data/employees_partFiles /user/akhanolk/formatProject/output/output-CombineFileInputFormat
*****************************
*Results
*****************************
....
13/09/22 17:16:31 INFO mapred.JobClient: Launched map tasks=1
13/09/22 17:16:31 INFO mapred.JobClient: Data-local map tasks=1
13/09/22 17:16:31 INFO mapred.JobClient: Total time spent by all maps in occupied slots (ms)=17885
...
$ hadoop fs -ls -R formatProject/output/output-CombineFileInputFormat/part* | awk '{print $8}'
formatProject/output/output-CombineFileInputFormat/part-00000
$ hadoop fs -cat formatProject/output/output-CombineFileInputFormat/part-00000
10001 Georgi Facello
10002 Bezalel Simmel
10003 Parto Bamford
10004 Chirstian Koblick
10005 Kyoichi Maliniak
.....
**************************
References
**************************
Apache documentation:
http://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapred/lib/CombineFileInputFormat.html
Concepts:
Hadoop the Definitive Guide
Code:
http://stackoverflow.com/questions/14270317/implementation-for-combinefileinputformat-hadoop-0-20-205
Data:
The data in this solution is from mysql - http://dev.mysql.com/doc/employee/en.index.html
@airawat
Copy link
Author

airawat commented Sep 21, 2013

CombineFileInputFormat and large files

From Hadoop the definitive guide-
"CombineFileInputFormat isn’t just good for small files; it can bring benefits when processing large files, too. Essentially, CombineFileInputFormat decouples the amount of data that a mapper consumes from the block size of the files in HDFS.

If your mappers can process each block in a matter of seconds, you could use CombineFileInputFormat with the maximum split size set to a small multiple of the number of blocks (by setting the mapred.max.split.size property in bytes) so that each mapper processes more than one block.

In return, the overall processing time falls, since proportionally fewer mappers run, which reduces the overhead in task bookkeeping and startup time associated with a large number of short-lived mappers."

@subhahere
Copy link

thanks ,the code complies so neatly ... i imported data from sqoop and it created 4 files, am able to successful run this code against those files to generate a single output file

@ravindrabajpai
Copy link

I set isSplitable to return false and here I see a problem. I have a workload with many small files (way less than block size) and couple of large file (way more than block size). And hence 2 mappers are lagging. If I set isSplitable to return true than it may break the last record in the last file (small or large) in split. How to avoid this breaking to happen?

@bmentges
Copy link

This is an awesome material, thanks for sharing!!!

@pucheng-tan
Copy link

Can CombineFileInputFormat combine small files if each file is compressed and encrypted?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment