Last active
September 26, 2018 21:16
-
-
Save PatrikValkovic/81fac6b33acd6b8e064e5483459526fe to your computer and use it in GitHub Desktop.
Hadoop cartesian product using MapReduce
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.DoubleWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
import java.io.IOException; | |
public class BiGrams { | |
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { | |
Path input1 = new Path(args[0]); | |
Path input2 = new Path(args[1]); | |
Path output = new Path(args[2]); | |
Configuration conf = new Configuration(); | |
conf.set(Jaccard.GRAMSIZE, "2"); | |
Job job1 = Job.getInstance(conf, "single word count"); | |
job1.setJarByClass(Jaccard.class); | |
job1.setMapperClass(Jaccard.Map.class); | |
job1.setOutputKeyClass(DoubleWritable.class); | |
job1.setOutputValueClass(Text.class); | |
CartesianInputFormat.setLeftInput(TextInputFormat.class, input1); | |
CartesianInputFormat.setRightInput(TextInputFormat.class, input2); | |
job1.setInputFormatClass(CartesianInputFormat.class); | |
FileOutputFormat.setOutputPath(job1, output); | |
if(!job1.waitForCompletion(true)) | |
System.exit(1); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.mapreduce.*; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
import org.apache.hadoop.mapreduce.lib.join.CompositeInputSplit; | |
import org.apache.hadoop.util.ReflectionUtils; | |
import java.io.IOException; | |
import java.util.ArrayList; | |
import java.util.List; | |
public class CartesianInputFormat extends InputFormat { | |
static private Class<? extends FileInputFormat> leftInputFormat; | |
static private Path leftInputFile = null; | |
static private Class<? extends FileInputFormat> rightInputFormat; | |
static private Path rightInputFile = null; | |
public static void setLeftInput(Class<? extends FileInputFormat> inputFormat, Path inputFile){ | |
CartesianInputFormat.leftInputFormat = inputFormat; | |
CartesianInputFormat.leftInputFile = inputFile; | |
} | |
public static void setRightInput(Class<? extends FileInputFormat> inputFormat, Path inputFile){ | |
CartesianInputFormat.rightInputFormat = inputFormat; | |
CartesianInputFormat.rightInputFile = inputFile; | |
} | |
@Override | |
public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException { | |
Configuration conf = job.getConfiguration(); | |
Job jobCopy = Job.getInstance(conf); | |
FileInputFormat leftInput = ReflectionUtils.newInstance(leftInputFormat, conf); | |
FileInputFormat rightInput = ReflectionUtils.newInstance(rightInputFormat, conf); | |
long splitSize = FileInputFormat.getMaxSplitSize(job); | |
long newSplitSize = (long)Math.pow(splitSize, 0.25); | |
if(newSplitSize <= 1024) | |
newSplitSize = splitSize; | |
FileInputFormat.setMaxInputSplitSize(jobCopy, newSplitSize); | |
FileInputFormat.setInputPaths(jobCopy, leftInputFile); | |
List<InputSplit> leftSplits = leftInput.getSplits(jobCopy); | |
FileInputFormat.setInputPaths(jobCopy, rightInputFile); | |
List<InputSplit> rightSplits = rightInput.getSplits(jobCopy); | |
List<InputSplit> product = new ArrayList<>(leftSplits.size() * rightSplits.size()); | |
for(InputSplit s1 : leftSplits) | |
for(InputSplit s2: rightSplits){ | |
CompositeInputSplit s = new CompositeInputSplit(2); | |
s.add(s1); | |
s.add(s2); | |
product.add(s); | |
} | |
return product; | |
} | |
@Override | |
public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { | |
return new CartesianRecordReader((CompositeInputSplit)inputSplit, leftInputFormat, rightInputFormat); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.hadoop.io.NullWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.InputSplit; | |
import org.apache.hadoop.mapreduce.RecordReader; | |
import org.apache.hadoop.mapreduce.TaskAttemptContext; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
import org.apache.hadoop.mapreduce.lib.join.CompositeInputSplit; | |
import org.apache.hadoop.util.ReflectionUtils; | |
import java.io.IOException; | |
public class CartesianRecordReader extends RecordReader { | |
private CompositeInputSplit split; | |
private Class<? extends FileInputFormat> leftInputFormatClass; | |
private Class<? extends FileInputFormat> rightInputFormatClass; | |
private FileInputFormat leftInputFormat; | |
private FileInputFormat rightInputFormat; | |
private RecordReader leftReader; | |
private RecordReader rightReader; | |
private TaskAttemptContext context; | |
private boolean inicialized; | |
public CartesianRecordReader(CompositeInputSplit split, Class<? extends FileInputFormat> leftInputFormatClass, Class<? extends FileInputFormat> rightInputFormat) { | |
this.split = split; | |
this.leftInputFormatClass = leftInputFormatClass; | |
this.rightInputFormatClass = rightInputFormat; | |
} | |
@Override | |
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { | |
leftInputFormat = ReflectionUtils.newInstance(leftInputFormatClass, taskAttemptContext.getConfiguration()); | |
rightInputFormat = ReflectionUtils.newInstance(rightInputFormatClass, taskAttemptContext.getConfiguration()); | |
leftReader = leftInputFormat.createRecordReader(split.get(0), taskAttemptContext); | |
rightReader = rightInputFormat.createRecordReader(split.get(1), taskAttemptContext); | |
leftReader.initialize(split.get(0), taskAttemptContext); | |
rightReader.initialize(split.get(1), taskAttemptContext); | |
inicialized = false; | |
context = taskAttemptContext; | |
} | |
@Override | |
public boolean nextKeyValue() throws IOException, InterruptedException { | |
try { | |
if(!inicialized){ | |
if(!leftReader.nextKeyValue()) | |
return false; | |
inicialized = true; | |
} | |
while(!rightReader.nextKeyValue()) { | |
if (!leftReader.nextKeyValue()) | |
return false; | |
rightReader.close(); | |
rightReader = rightInputFormat.createRecordReader(split.get(1), context); | |
rightReader.initialize(split.get(1), context); | |
} | |
return true; | |
} | |
catch(Exception e){ | |
e.printStackTrace(); | |
throw e; | |
} | |
} | |
@Override | |
public Object getCurrentKey() throws IOException, InterruptedException { | |
return NullWritable.get(); | |
} | |
@Override | |
public Object getCurrentValue() throws IOException, InterruptedException { | |
try { | |
Object leftValue = leftReader.getCurrentValue(); | |
Object rightValue = rightReader.getCurrentValue(); | |
if(leftValue == null || rightValue == null) | |
return null; | |
String s = leftValue.toString() + " " + rightValue.toString(); | |
return new Text(s); | |
} | |
catch(Exception e){ | |
e.printStackTrace(); | |
throw e; | |
} | |
} | |
@Override | |
public float getProgress() throws IOException, InterruptedException { | |
double leftProgress = leftReader.getProgress(); | |
double rightProgress = rightReader.getProgress(); | |
double totalProgress = leftProgress * rightProgress; | |
return (float)totalProgress; | |
} | |
@Override | |
public void close() throws IOException { | |
leftReader.close(); | |
rightReader.close(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.io.DoubleWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Mapper; | |
import java.io.IOException; | |
import java.util.HashSet; | |
import java.util.Set; | |
public class Jaccard { | |
public static final String TRESHOLD = "treshold"; | |
public static final String GRAMSIZE = "gramsize"; | |
public static class Map extends Mapper<Object, Text, DoubleWritable, Text> { | |
private double treshold = 0.85; | |
private int gramsize = 2; | |
private DoubleWritable d = new DoubleWritable(); | |
@Override | |
protected void setup(Context context) throws IOException, InterruptedException { | |
Configuration conf = context.getConfiguration(); | |
treshold = Double.parseDouble(conf.get(Jaccard.TRESHOLD, "0.85")); | |
gramsize = Integer.parseUnsignedInt(conf.get(Jaccard.GRAMSIZE, "2")); | |
} | |
@Override | |
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { | |
String content = value.toString().trim(); | |
String[] words = content.split(" "); | |
String leftWord = words[0]; | |
int leftWordLength = leftWord.length(); | |
Set<String> leftSet = new HashSet<>(); | |
for(int i=gramsize;i<=leftWordLength;i++) | |
leftSet.add(leftWord.substring(i-gramsize,i)); | |
String rightWord = words[1]; | |
int rightWordLength = rightWord.length(); | |
Set<String> rightSet = new HashSet<>(); | |
for(int i=gramsize;i<=rightWordLength;i++) | |
rightSet.add(rightWord.substring(i-gramsize,i)); | |
Set<String> union = new HashSet<>(leftSet); | |
union.addAll(rightSet); | |
Set<String> intersection = new HashSet<>(leftSet); | |
intersection.retainAll(rightSet); | |
int intersectionSize = intersection.size(); | |
int unionSize = union.size(); | |
if(intersectionSize != 0 && unionSize != 0){ | |
double result = (double)intersectionSize / (double)unionSize; | |
if(result > treshold && result < 1.0-10E-12){ | |
d.set(result); | |
context.write(d, value); | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment