Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Example Hadoop Job that reads a cache file loaded from S3
// Based on http://pragmaticintegrator.wordpress.com/2013/08/16/writing-a-hadoop-mapreduce-task-in-java/
package net.twasink.hadoop;
import java.io.File;
import java.net.URI;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.joda.time.DateTime;
public class HadoopMain {
public static void main(String[] args) throws Exception
{
String inputPath = args[0];
String outputPath = args[1];
String s3File = args[2];
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "twasink");
job.setJarByClass(HadoopMain.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, new Path(outputPath));
// s3File should be a URI with s3: or s3n: protocol. It will be accessible as a local filed called 'theFile'
job.addCacheFile(new URI(s3File + "#theFile"));
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
// Based on http://pragmaticintegrator.wordpress.com/2013/08/16/writing-a-hadoop-mapreduce-task-in-java/
package net.twasink.hadoop;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.StringTokenizer;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MyMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
// Default implementation - pass the input to the output
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
super.map(key, value, context);
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
super.cleanup(context);
}
@Override
protected void setup(Context context) throws IOException, InterruptedException {
if (context.getCacheFiles() != null && context.getCacheFiles().length > 0) {
URI mappingFileUri = context.getCacheFiles()[0];
if (mappingFileUri != null) {
// Would probably be a good idea to inspect the URI to see what the bit after the # is, as that's the file name
System.out.println("Mapping File: " + FileUtils.readFileToString(new File("./theFile")));
} else {
System.out.println(">>>>>> NO MAPPING FILE");
}
} else {
System.out.println(">>>>>> NO CACHE FILES AT ALL");
}
}
}
@twasink

This comment has been minimized.

Copy link
Owner Author

@twasink twasink commented Feb 4, 2014

I had all sorts of problems getting my head around how cache files work with Hadoop. Finally, I stumbled across the answer - when you add a cache file (see HadoopMain#48), it's available to read as a local file inside the mapper (MyMapper#36).

When running in Elastic MapReduce, the file URI can be an S3 file, using either s3://bucket/path or s3n://bucket/path - this may or may not work in other Hadoop implementations, but the general approach would work fine.

@rhema

This comment has been minimized.

Copy link

@rhema rhema commented Feb 12, 2015

Super helpful. Thanks!

@AvaniShah

This comment has been minimized.

Copy link

@AvaniShah AvaniShah commented Apr 13, 2016

Sir, I am getting error for the getCacheFiles() method
The error is :- The method getCacheFiles() is undefined for the type Mapper<HipiImageHeader,FloatImage,IntWritable,IntWritable>.Context

May I get any solution for this as soon as possible

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment