Skip to content

Instantly share code, notes, and snippets.

@ramkicse
Last active August 29, 2015 14:12
Show Gist options
  • Save ramkicse/2395cb69866fcf37d0d6 to your computer and use it in GitHub Desktop.
Save ramkicse/2395cb69866fcf37d0d6 to your computer and use it in GitHub Desktop.
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package com.dot.weather.join.mapside.withtool;
import com.dot.weather.join.mapside.*;
import com.dot.weather.join.reduceside.withtool.TemperatureVsHeightWithTool;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
*
* @author sandeep
*/
public class TVsHMapSideJoinWithTool extends Configured implements Tool {
/**
-Dmapred.reduce.tasks=0
/home/sandeep/Desktop/HADOOP_Complete_Material/daily_weather_data/daily_waether_data_2014
/home/sandeep/Desktop/HADOOP_Complete_Material/daily_weather_data/station_data/201401station.txt
/home/sandeep/Desktop/HADOOP_Complete_Material/Hadoop_Projects/TVsHMapSideJoinWithTool
inner
*/
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(getConf());
job.setJarByClass(DriverCode.class);
job.setJobName("MapSideJoin");
String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();
if (otherArgs.length != 4) {
System.err.println("Usage: <in1> <in2> <out> <jointType (inner|leftouter)>");
ToolRunner.printGenericCommandUsage(System.err);
System.exit(2);
}
String joinType = otherArgs[3];
job.getConfiguration().set("join.type", joinType);
// job.setNumReduceTasks(0);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.addCacheFile(new URI(otherArgs[1]));
MultipleInputs.addInputPath(job, new Path(otherArgs[0]), TextInputFormat.class, MapperClass.class);
FileOutputFormat.setOutputPath(job, new Path(otherArgs[2] + File.separator + joinType));
int jobStatus = job.waitForCompletion(true) ? 0 : 1;
System.out.println("jobStatus=" + jobStatus);
return jobStatus;
}
public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(), new TVsHMapSideJoinWithTool(), args);
}
public static class TemperatureMapper extends
Mapper<Object, Text, Text, Text> {
private static final Text EMPTY_TEXT = new Text("-\t-");
private HashMap<String, Integer> stationCodeInformation = new HashMap<String, Integer>();
private Text outvalue = new Text();
private String joinType = null;
public void setup(Context context) throws IOException,
InterruptedException {
// Get the join type from the configuration
joinType = context.getConfiguration().get("join.type");
Path[] files
= DistributedCache.getLocalCacheFiles(context.getConfiguration());
// Read all files in the DistributedCache
for (Path p : files) {
BufferedReader rdr = new BufferedReader(
new InputStreamReader(
new FileInputStream(
new File(p.toString()))));
String line = null;
// For each record in the user file
while ((line = rdr.readLine()) != null) {
// Get the user ID for this record
if (line == null || line.toString().isEmpty() || line.toString().length() <= 1) {
} else {
try {
String[] split = line.toString().split("\\|");
String stationCode = split[0].trim();
Text stationCodeKey = new Text(stationCode);
String heightData = split[11].trim();
int height = Integer.parseInt(heightData);
System.out.println("split code = " + split[0] + "\tHeight=" + height);
IntWritable heightCorrect = new IntWritable(height);
stationCodeInformation.put(stationCodeKey.toString(), heightCorrect.get());
} catch (Exception exception) {
System.out.println(exception);
}
//List<Integer> list = stationCodeInformation.get(stationCodeKey.toString());
}
}
}
}
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
try {
if (value == null || value.toString().isEmpty() || value.toString().length() <= 1) {
} else {
String[] split = value.toString().split(",");
String stationCode = split[0].trim();
Text stationCodeKey = new Text(stationCode);
String dateData = split[1].trim();
String maxTData = split[2].trim();
int maxT = Integer.parseInt(maxTData);
String minTData = split[4].trim();
int minT = Integer.parseInt(minTData);
IntWritable maxTCorrect = new IntWritable(maxT);
IntWritable minTCorrect = new IntWritable(minT);
Integer height = stationCodeInformation.get(stationCodeKey.toString());
Text tempDataValue = new Text(stationCode + "\t" + dateData + "\t" + maxTCorrect.get() + "\t" + minTCorrect.get());
if (height != null) {
context.write(tempDataValue, new Text(stationCodeKey.toString() + "\t" + height));
} else {
if (joinType.equals("leftouter")) {
context.write(tempDataValue, EMPTY_TEXT);
}
}
}
} catch (Throwable t) {
System.out.println(t);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment