Skip to content

Instantly share code, notes, and snippets.

@PatrikValkovic
Last active September 26, 2018 21:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save PatrikValkovic/81fac6b33acd6b8e064e5483459526fe to your computer and use it in GitHub Desktop.
Save PatrikValkovic/81fac6b33acd6b8e064e5483459526fe to your computer and use it in GitHub Desktop.
Hadoop cartesian product using MapReduce
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class BiGrams {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Path input1 = new Path(args[0]);
Path input2 = new Path(args[1]);
Path output = new Path(args[2]);
Configuration conf = new Configuration();
conf.set(Jaccard.GRAMSIZE, "2");
Job job1 = Job.getInstance(conf, "single word count");
job1.setJarByClass(Jaccard.class);
job1.setMapperClass(Jaccard.Map.class);
job1.setOutputKeyClass(DoubleWritable.class);
job1.setOutputValueClass(Text.class);
CartesianInputFormat.setLeftInput(TextInputFormat.class, input1);
CartesianInputFormat.setRightInput(TextInputFormat.class, input2);
job1.setInputFormatClass(CartesianInputFormat.class);
FileOutputFormat.setOutputPath(job1, output);
if(!job1.waitForCompletion(true))
System.exit(1);
}
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.join.CompositeInputSplit;
import org.apache.hadoop.util.ReflectionUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class CartesianInputFormat extends InputFormat {
static private Class<? extends FileInputFormat> leftInputFormat;
static private Path leftInputFile = null;
static private Class<? extends FileInputFormat> rightInputFormat;
static private Path rightInputFile = null;
public static void setLeftInput(Class<? extends FileInputFormat> inputFormat, Path inputFile){
CartesianInputFormat.leftInputFormat = inputFormat;
CartesianInputFormat.leftInputFile = inputFile;
}
public static void setRightInput(Class<? extends FileInputFormat> inputFormat, Path inputFile){
CartesianInputFormat.rightInputFormat = inputFormat;
CartesianInputFormat.rightInputFile = inputFile;
}
@Override
public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
Configuration conf = job.getConfiguration();
Job jobCopy = Job.getInstance(conf);
FileInputFormat leftInput = ReflectionUtils.newInstance(leftInputFormat, conf);
FileInputFormat rightInput = ReflectionUtils.newInstance(rightInputFormat, conf);
long splitSize = FileInputFormat.getMaxSplitSize(job);
long newSplitSize = (long)Math.pow(splitSize, 0.25);
if(newSplitSize <= 1024)
newSplitSize = splitSize;
FileInputFormat.setMaxInputSplitSize(jobCopy, newSplitSize);
FileInputFormat.setInputPaths(jobCopy, leftInputFile);
List<InputSplit> leftSplits = leftInput.getSplits(jobCopy);
FileInputFormat.setInputPaths(jobCopy, rightInputFile);
List<InputSplit> rightSplits = rightInput.getSplits(jobCopy);
List<InputSplit> product = new ArrayList<>(leftSplits.size() * rightSplits.size());
for(InputSplit s1 : leftSplits)
for(InputSplit s2: rightSplits){
CompositeInputSplit s = new CompositeInputSplit(2);
s.add(s1);
s.add(s2);
product.add(s);
}
return product;
}
@Override
public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
return new CartesianRecordReader((CompositeInputSplit)inputSplit, leftInputFormat, rightInputFormat);
}
}
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.join.CompositeInputSplit;
import org.apache.hadoop.util.ReflectionUtils;
import java.io.IOException;
public class CartesianRecordReader extends RecordReader {
private CompositeInputSplit split;
private Class<? extends FileInputFormat> leftInputFormatClass;
private Class<? extends FileInputFormat> rightInputFormatClass;
private FileInputFormat leftInputFormat;
private FileInputFormat rightInputFormat;
private RecordReader leftReader;
private RecordReader rightReader;
private TaskAttemptContext context;
private boolean inicialized;
public CartesianRecordReader(CompositeInputSplit split, Class<? extends FileInputFormat> leftInputFormatClass, Class<? extends FileInputFormat> rightInputFormat) {
this.split = split;
this.leftInputFormatClass = leftInputFormatClass;
this.rightInputFormatClass = rightInputFormat;
}
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
leftInputFormat = ReflectionUtils.newInstance(leftInputFormatClass, taskAttemptContext.getConfiguration());
rightInputFormat = ReflectionUtils.newInstance(rightInputFormatClass, taskAttemptContext.getConfiguration());
leftReader = leftInputFormat.createRecordReader(split.get(0), taskAttemptContext);
rightReader = rightInputFormat.createRecordReader(split.get(1), taskAttemptContext);
leftReader.initialize(split.get(0), taskAttemptContext);
rightReader.initialize(split.get(1), taskAttemptContext);
inicialized = false;
context = taskAttemptContext;
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
try {
if(!inicialized){
if(!leftReader.nextKeyValue())
return false;
inicialized = true;
}
while(!rightReader.nextKeyValue()) {
if (!leftReader.nextKeyValue())
return false;
rightReader.close();
rightReader = rightInputFormat.createRecordReader(split.get(1), context);
rightReader.initialize(split.get(1), context);
}
return true;
}
catch(Exception e){
e.printStackTrace();
throw e;
}
}
@Override
public Object getCurrentKey() throws IOException, InterruptedException {
return NullWritable.get();
}
@Override
public Object getCurrentValue() throws IOException, InterruptedException {
try {
Object leftValue = leftReader.getCurrentValue();
Object rightValue = rightReader.getCurrentValue();
if(leftValue == null || rightValue == null)
return null;
String s = leftValue.toString() + " " + rightValue.toString();
return new Text(s);
}
catch(Exception e){
e.printStackTrace();
throw e;
}
}
@Override
public float getProgress() throws IOException, InterruptedException {
double leftProgress = leftReader.getProgress();
double rightProgress = rightReader.getProgress();
double totalProgress = leftProgress * rightProgress;
return (float)totalProgress;
}
@Override
public void close() throws IOException {
leftReader.close();
rightReader.close();
}
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
public class Jaccard {
public static final String TRESHOLD = "treshold";
public static final String GRAMSIZE = "gramsize";
public static class Map extends Mapper<Object, Text, DoubleWritable, Text> {
private double treshold = 0.85;
private int gramsize = 2;
private DoubleWritable d = new DoubleWritable();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
treshold = Double.parseDouble(conf.get(Jaccard.TRESHOLD, "0.85"));
gramsize = Integer.parseUnsignedInt(conf.get(Jaccard.GRAMSIZE, "2"));
}
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String content = value.toString().trim();
String[] words = content.split(" ");
String leftWord = words[0];
int leftWordLength = leftWord.length();
Set<String> leftSet = new HashSet<>();
for(int i=gramsize;i<=leftWordLength;i++)
leftSet.add(leftWord.substring(i-gramsize,i));
String rightWord = words[1];
int rightWordLength = rightWord.length();
Set<String> rightSet = new HashSet<>();
for(int i=gramsize;i<=rightWordLength;i++)
rightSet.add(rightWord.substring(i-gramsize,i));
Set<String> union = new HashSet<>(leftSet);
union.addAll(rightSet);
Set<String> intersection = new HashSet<>(leftSet);
intersection.retainAll(rightSet);
int intersectionSize = intersection.size();
int unionSize = union.size();
if(intersectionSize != 0 && unionSize != 0){
double result = (double)intersectionSize / (double)unionSize;
if(result > treshold && result < 1.0-10E-12){
d.set(result);
context.write(d, value);
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment