Skip to content

Instantly share code, notes, and snippets.

@geofferyzh
Created October 2, 2012 02:49
Show Gist options
  • Save geofferyzh/3815898 to your computer and use it in GitHub Desktop.
Save geofferyzh/3815898 to your computer and use it in GitHub Desktop.
Hadoop 101 - Side Data Distribution using Job Configuration
// Set the side data info in job configuration file
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class SideDataDistribution extends Configured implements Tool {
private static final Logger sLogger = Logger.getLogger(SideDataDistribution.class);
// specify the path of the side data source on local filesystem (
public static final String sideData= ".../Initial_Activation_List.txt";
String sideDataLine;
public void loadSideData(String sideData, int linenum_index) throws IOException {
String line;
int linenum = 0;
BufferedReader bufferedReader = new BufferedReader(new FileReader(sideData));
while ((line = bufferedReader.readLine()) != null) {
linenum = linenum + 1;
if (linenum == linenum_index) {
sideDataLine= line.toString();
break;
}
}
bufferedReader.close();
}
//----------------------------------------------------------------------
@Override
public int run(String[] arg0) throws Exception {
Configuration conf = getConf();
// Add custom config variables to configuration
conf.addResource("extraconfiguration.xml");
FileSystem hdfs = FileSystem.get(conf);
// load the side data and pass one line each time to mappers/reducers through jobconf
loadSideData(sideData, n_lines);
conf.setInt("passsidedata", Integer.parseInt(sideDataLine));
// Job
Job job = new Job(conf, "load side data");
job.setJarByClass(SideDataDistribution.class);
... // detailed configuration omitted
return 0;
}
//----------------------------------------------------------------------
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(), new SideDataDistribution(), args);
System.exit(exitCode);
}
}
// In mapper/reducer tasks, retrieve the side data information from Job Conf
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
public class SideDataDistributionMapperextends Mapper<Text, VertexInfo, Text, VertexInfo> {
int sideDataLine;
// Get Configuration and Retrieve side data
@Override
public void setup(Context context) throws IOException {
// Get configuration
Configuration conf = context.getConfiguration();
sideDataLine= conf.getInt("passsidedata",0);
}
// Map Task ---------------------------------------------------------------------------
public void map(Text key, VertexInfo value, Context context) throws IOException, InterruptedException {
... // map tasks omitted
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment