Last active
August 29, 2015 14:12
-
-
Save ramkicse/2395cb69866fcf37d0d6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* To change this license header, choose License Headers in Project Properties. | |
* To change this template file, choose Tools | Templates | |
* and open the template in the editor. | |
*/ | |
package com.dot.weather.join.mapside.withtool; | |
import com.dot.weather.join.mapside.*; | |
import com.dot.weather.join.reduceside.withtool.TemperatureVsHeightWithTool; | |
import java.io.BufferedReader; | |
import java.io.File; | |
import java.io.FileInputStream; | |
import java.io.IOException; | |
import java.io.InputStreamReader; | |
import java.net.URI; | |
import java.util.HashMap; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.conf.Configured; | |
import org.apache.hadoop.filecache.DistributedCache; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.IntWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.util.GenericOptionsParser; | |
import org.apache.hadoop.mapreduce.Mapper; | |
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; | |
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
import org.apache.hadoop.util.Tool; | |
import org.apache.hadoop.util.ToolRunner; | |
/** | |
* | |
* @author sandeep | |
*/ | |
public class TVsHMapSideJoinWithTool extends Configured implements Tool { | |
/** | |
-Dmapred.reduce.tasks=0 | |
/home/sandeep/Desktop/HADOOP_Complete_Material/daily_weather_data/daily_waether_data_2014 | |
/home/sandeep/Desktop/HADOOP_Complete_Material/daily_weather_data/station_data/201401station.txt | |
/home/sandeep/Desktop/HADOOP_Complete_Material/Hadoop_Projects/TVsHMapSideJoinWithTool | |
inner | |
*/ | |
@Override | |
public int run(String[] args) throws Exception { | |
Job job = Job.getInstance(getConf()); | |
job.setJarByClass(DriverCode.class); | |
job.setJobName("MapSideJoin"); | |
String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs(); | |
if (otherArgs.length != 4) { | |
System.err.println("Usage: <in1> <in2> <out> <jointType (inner|leftouter)>"); | |
ToolRunner.printGenericCommandUsage(System.err); | |
System.exit(2); | |
} | |
String joinType = otherArgs[3]; | |
job.getConfiguration().set("join.type", joinType); | |
// job.setNumReduceTasks(0); | |
job.setMapOutputKeyClass(Text.class); | |
job.setMapOutputValueClass(Text.class); | |
job.addCacheFile(new URI(otherArgs[1])); | |
MultipleInputs.addInputPath(job, new Path(otherArgs[0]), TextInputFormat.class, MapperClass.class); | |
FileOutputFormat.setOutputPath(job, new Path(otherArgs[2] + File.separator + joinType)); | |
int jobStatus = job.waitForCompletion(true) ? 0 : 1; | |
System.out.println("jobStatus=" + jobStatus); | |
return jobStatus; | |
} | |
public static void main(String[] args) throws Exception { | |
ToolRunner.run(new Configuration(), new TVsHMapSideJoinWithTool(), args); | |
} | |
public static class TemperatureMapper extends | |
Mapper<Object, Text, Text, Text> { | |
private static final Text EMPTY_TEXT = new Text("-\t-"); | |
private HashMap<String, Integer> stationCodeInformation = new HashMap<String, Integer>(); | |
private Text outvalue = new Text(); | |
private String joinType = null; | |
public void setup(Context context) throws IOException, | |
InterruptedException { | |
// Get the join type from the configuration | |
joinType = context.getConfiguration().get("join.type"); | |
Path[] files | |
= DistributedCache.getLocalCacheFiles(context.getConfiguration()); | |
// Read all files in the DistributedCache | |
for (Path p : files) { | |
BufferedReader rdr = new BufferedReader( | |
new InputStreamReader( | |
new FileInputStream( | |
new File(p.toString())))); | |
String line = null; | |
// For each record in the user file | |
while ((line = rdr.readLine()) != null) { | |
// Get the user ID for this record | |
if (line == null || line.toString().isEmpty() || line.toString().length() <= 1) { | |
} else { | |
try { | |
String[] split = line.toString().split("\\|"); | |
String stationCode = split[0].trim(); | |
Text stationCodeKey = new Text(stationCode); | |
String heightData = split[11].trim(); | |
int height = Integer.parseInt(heightData); | |
System.out.println("split code = " + split[0] + "\tHeight=" + height); | |
IntWritable heightCorrect = new IntWritable(height); | |
stationCodeInformation.put(stationCodeKey.toString(), heightCorrect.get()); | |
} catch (Exception exception) { | |
System.out.println(exception); | |
} | |
//List<Integer> list = stationCodeInformation.get(stationCodeKey.toString()); | |
} | |
} | |
} | |
} | |
public void map(Object key, Text value, Context context) | |
throws IOException, InterruptedException { | |
try { | |
if (value == null || value.toString().isEmpty() || value.toString().length() <= 1) { | |
} else { | |
String[] split = value.toString().split(","); | |
String stationCode = split[0].trim(); | |
Text stationCodeKey = new Text(stationCode); | |
String dateData = split[1].trim(); | |
String maxTData = split[2].trim(); | |
int maxT = Integer.parseInt(maxTData); | |
String minTData = split[4].trim(); | |
int minT = Integer.parseInt(minTData); | |
IntWritable maxTCorrect = new IntWritable(maxT); | |
IntWritable minTCorrect = new IntWritable(minT); | |
Integer height = stationCodeInformation.get(stationCodeKey.toString()); | |
Text tempDataValue = new Text(stationCode + "\t" + dateData + "\t" + maxTCorrect.get() + "\t" + minTCorrect.get()); | |
if (height != null) { | |
context.write(tempDataValue, new Text(stationCodeKey.toString() + "\t" + height)); | |
} else { | |
if (joinType.equals("leftouter")) { | |
context.write(tempDataValue, EMPTY_TEXT); | |
} | |
} | |
} | |
} catch (Throwable t) { | |
System.out.println(t); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment