Example Hadoop Job that reads a cache file loaded from S3
// Based on http://pragmaticintegrator.wordpress.com/2013/08/16/writing-a-hadoop-mapreduce-task-in-java/ | |
package net.twasink.hadoop; | |
import java.io.File; | |
import java.net.URI; | |
import org.apache.commons.io.FileUtils; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; | |
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; | |
import org.joda.time.DateTime; | |
public class HadoopMain { | |
public static void main(String[] args) throws Exception | |
{ | |
String inputPath = args[0]; | |
String outputPath = args[1]; | |
String s3File = args[2]; | |
Configuration conf = new Configuration(); | |
Job job = Job.getInstance(conf, "twasink"); | |
job.setJarByClass(HadoopMain.class); | |
job.setMapperClass(MyMapper.class); | |
job.setReducerClass(MyReducer.class); | |
job.setOutputKeyClass(LongWritable.class); | |
job.setOutputValueClass(Text.class); | |
job.setMapOutputKeyClass(LongWritable.class); | |
job.setMapOutputValueClass(Text.class); | |
job.setInputFormatClass(TextInputFormat.class); | |
job.setOutputFormatClass(TextOutputFormat.class); | |
FileInputFormat.addInputPath(job, new Path(inputPath)); | |
FileOutputFormat.setOutputPath(job, new Path(outputPath)); | |
// s3File should be a URI with s3: or s3n: protocol. It will be accessible as a local filed called 'theFile' | |
job.addCacheFile(new URI(s3File + "#theFile")); | |
boolean result = job.waitForCompletion(true); | |
System.exit(result ? 0 : 1); | |
} | |
} |
// Based on http://pragmaticintegrator.wordpress.com/2013/08/16/writing-a-hadoop-mapreduce-task-in-java/ | |
package net.twasink.hadoop; | |
import java.io.File; | |
import java.io.IOException; | |
import java.net.URI; | |
import java.util.StringTokenizer; | |
import org.apache.commons.io.FileUtils; | |
import org.apache.commons.io.IOUtils; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Mapper; | |
public class MyMapper extends Mapper<LongWritable, Text, LongWritable, Text> { | |
// Default implementation - pass the input to the output | |
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException | |
{ | |
super.map(key, value, context); | |
} | |
@Override | |
protected void cleanup(Context context) throws IOException, InterruptedException { | |
super.cleanup(context); | |
} | |
@Override | |
protected void setup(Context context) throws IOException, InterruptedException { | |
if (context.getCacheFiles() != null && context.getCacheFiles().length > 0) { | |
URI mappingFileUri = context.getCacheFiles()[0]; | |
if (mappingFileUri != null) { | |
// Would probably be a good idea to inspect the URI to see what the bit after the # is, as that's the file name | |
System.out.println("Mapping File: " + FileUtils.readFileToString(new File("./theFile"))); | |
} else { | |
System.out.println(">>>>>> NO MAPPING FILE"); | |
} | |
} else { | |
System.out.println(">>>>>> NO CACHE FILES AT ALL"); | |
} | |
} | |
} |
This comment has been minimized.
This comment has been minimized.
Super helpful. Thanks! |
This comment has been minimized.
This comment has been minimized.
Sir, I am getting error for the getCacheFiles() method May I get any solution for this as soon as possible |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This comment has been minimized.
I had all sorts of problems getting my head around how cache files work with Hadoop. Finally, I stumbled across the answer - when you add a cache file (see HadoopMain#48), it's available to read as a local file inside the mapper (MyMapper#36).
When running in Elastic MapReduce, the file URI can be an S3 file, using either s3://bucket/path or s3n://bucket/path - this may or may not work in other Hadoop implementations, but the general approach would work fine.