Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Example Hadoop Job that reads a cache file loaded from S3
// Based on http://pragmaticintegrator.wordpress.com/2013/08/16/writing-a-hadoop-mapreduce-task-in-java/
package net.twasink.hadoop;
import java.io.File;
import java.net.URI;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.joda.time.DateTime;
public class HadoopMain {
public static void main(String[] args) throws Exception
{
String inputPath = args[0];
String outputPath = args[1];
String s3File = args[2];
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "twasink");
job.setJarByClass(HadoopMain.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, new Path(outputPath));
// s3File should be a URI with s3: or s3n: protocol. It will be accessible as a local filed called 'theFile'
job.addCacheFile(new URI(s3File + "#theFile"));
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
// Based on http://pragmaticintegrator.wordpress.com/2013/08/16/writing-a-hadoop-mapreduce-task-in-java/
package net.twasink.hadoop;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.StringTokenizer;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MyMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
// Default implementation - pass the input to the output
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
super.map(key, value, context);
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
super.cleanup(context);
}
@Override
protected void setup(Context context) throws IOException, InterruptedException {
if (context.getCacheFiles() != null && context.getCacheFiles().length > 0) {
URI mappingFileUri = context.getCacheFiles()[0];
if (mappingFileUri != null) {
// Would probably be a good idea to inspect the URI to see what the bit after the # is, as that's the file name
System.out.println("Mapping File: " + FileUtils.readFileToString(new File("./theFile")));
} else {
System.out.println(">>>>>> NO MAPPING FILE");
}
} else {
System.out.println(">>>>>> NO CACHE FILES AT ALL");
}
}
}
@twasink

This comment has been minimized.

Copy link
Owner Author

commented Feb 4, 2014

I had all sorts of problems getting my head around how cache files work with Hadoop. Finally, I stumbled across the answer - when you add a cache file (see HadoopMain#48), it's available to read as a local file inside the mapper (MyMapper#36).

When running in Elastic MapReduce, the file URI can be an S3 file, using either s3://bucket/path or s3n://bucket/path - this may or may not work in other Hadoop implementations, but the general approach would work fine.

@rhema

This comment has been minimized.

Copy link

commented Feb 12, 2015

Super helpful. Thanks!

@AvaniShah

This comment has been minimized.

Copy link

commented Apr 13, 2016

Sir, I am getting error for the getCacheFiles() method
The error is :- The method getCacheFiles() is undefined for the type Mapper<HipiImageHeader,FloatImage,IntWritable,IntWritable>.Context

May I get any solution for this as soon as possible

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.