Created
December 1, 2023 14:10
-
-
Save Vatsal596/fd154138bc6b17dc0495953e6a521680 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.conf.Configured; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; | |
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
import org.apache.hadoop.util.Tool; | |
import org.apache.hadoop.util.ToolRunner; | |
public class DriverJoin extends Configured { | |
public static void main(String[] args) throws Exception { | |
if (args.length != 3) { | |
System.err.println("Usage : <inputlocation> <inputlocation> <outputlocation> "); | |
System.exit(0); | |
} | |
else | |
{ | |
String source1 = args[0]; | |
String source2 = args[1]; | |
String dest = args[2]; | |
Configuration conf = new Configuration(); | |
conf.set("mapreduce.output.textoutputformat.separator", "\t"); | |
FileSystem fs = FileSystem.get(conf); | |
Job job = Job.getInstance(conf, "Multiple Jobs"); | |
job.setJarByClass(DriverJoin.class); | |
//job.setJar("Dept_join.jar"); | |
Path p1 = new Path(source1); | |
Path p2 = new Path(source2); | |
Path out = new Path(dest); | |
MultipleInputs.addInputPath(job, p1, TextInputFormat.class, MapperJoin.class); | |
MultipleInputs.addInputPath(job, p2, TextInputFormat.class, MapperJoin2.class); | |
job.setReducerClass(ReducerJoin.class); | |
job.setMapOutputKeyClass(Text.class); | |
job.setMapOutputValueClass(Text.class); | |
job.setOutputKeyClass(Text.class); | |
job.setOutputValueClass(Text.class); | |
job.setOutputFormatClass(TextOutputFormat.class); | |
/* | |
* delete if exist | |
*/ | |
if (fs.exists(out)) | |
fs.delete(out, true); | |
FileOutputFormat.setOutputPath(job, out); | |
boolean success = job.waitForCompletion(true); | |
System.exit(success? 0 : 1); | |
} | |
//int res = ToolRunner.run(new Configuration(), new DriverJoin(), args); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.IOException; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Mapper; | |
public class MapperJoin extends Mapper<LongWritable, Text, Text, Text> { | |
Text keyEmit = new Text(); | |
Text valEmit = new Text(); | |
public void map(LongWritable key, Text value, Context context) | |
throws IOException, InterruptedException { | |
String data[]=value.toString().split(","); | |
String deptId=data[0]; | |
String deptName=data[1]; | |
context.write(new Text(deptId), new Text(deptName)); | |
/*String line = value.toString(); | |
String parts[] = line.split(","); | |
keyEmit.set(parts[0]); | |
valEmit.set(parts[1]); | |
context.write(keyEmit, valEmit);*/ | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.IOException; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Mapper; | |
public class MapperJoin2 extends Mapper<LongWritable, Text, Text, Text> { | |
Text keyEmit = new Text(); | |
Text valEmit = new Text(); | |
public void map(LongWritable key, Text value, Context context) | |
throws IOException, InterruptedException { | |
String data[]=value.toString().split(","); | |
String deptId=data[0]; | |
String empStrength=data[1]; | |
context.write(new Text(deptId), new Text(empStrength)); | |
/* String line = value.toString(); | |
String parts[] = line.split(","); | |
keyEmit.set(parts[0]); | |
valEmit.set(parts[1]); | |
context.write(keyEmit, valEmit);*/ | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.IOException; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Reducer; | |
public class ReducerJoin extends Reducer<Text, Text, Text, Text> { | |
Text valEmit = new Text(); | |
String merge = ""; | |
public void reduce(Text key, Iterable<Text> values, Context context) | |
throws IOException, InterruptedException { | |
String character = ""; | |
String number = ""; | |
for (Text value : values) { | |
// ordering output | |
String val = value.toString(); | |
char myChar = val.charAt(0); | |
if (Character.isDigit(myChar)) { | |
number = val; | |
} else { | |
character = val; | |
} | |
} | |
merge = character + " " + number; | |
valEmit.set(merge); | |
context.write(key, valEmit); | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment