Skip to content

Instantly share code, notes, and snippets.

@geofferyzh
Created October 2, 2012 02:22
Show Gist options
  • Save geofferyzh/3815808 to your computer and use it in GitHub Desktop.
Save geofferyzh/3815808 to your computer and use it in GitHub Desktop.
Hadoop 101 - Distributed Cache
// In the job driver code, specify cache file location, add file to distributed cache
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.filecache.DistributedCache;
public class testdistcache extends Configured implements Tool {
public static final String localCacheFile= "localpath/localCacheFile.txt";
public static final String hdfsCacheFile = "hdfspath/hdfsCacheFile.txt";
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
// Add Distributed Cache
FileSystem fs = FileSystem.get(conf);
Path hdfsPath = new Path(hdfsCacheFile);
fs.copyFromLocalFile(false, true, new Path(localCacheFile),hdfsPath); // Load file into HDFS
DistributedCache.addCacheFile(hdfsPath.toUri(), conf); // Add file to distributed cache
Job job = new Job(conf, "Test Distributed Cache");
job.setJarByClass(testdistcache.class);
// other job related configuration are omitted ...
// Execute job
job.waitForCompletion(true);
return 0;
}
//----------------------------------------------------------------------
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(), new testdistcache(), args);
System.exit(exitCode);
}
}
// In the mapper or reducer code, get distributed cache file name and read the cache file
import java.io.BufferedReader;
import java.io.FileReader;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
public static class TestDistCacheMapper extends Mapper<Text, Text, Text, Text> {
private HashMap<String, String> idmap;
@Override
public void setup(Context context) {
LoadDistCache(context);
}
public void LoadDistCache(String hdfsCacheFile, int linenum_index) throws IOException {
String line;
int linenum = 0;
BufferedReader bufferedReader = new BufferedReader(new FileReader(LoadDistCache));
while ((line = bufferedReader.readLine()) != null) {
linenum = linenum + 1;
if (linenum == linenum_index) {
linecontent = line.toString();
break;
}
}
bufferedReader.close();
}
@Override
public void map(Text key, Text value, Context context) throws InterruptedException {
// mapping details omitted
context.write(outkey, outvalue);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment