Skip to content

Instantly share code, notes, and snippets.

@ashwanthkumar
Created March 16, 2017 06:57
Show Gist options
  • Save ashwanthkumar/cb8cb24cecdeb9917ef27dfada8d9f70 to your computer and use it in GitHub Desktop.
Save ashwanthkumar/cb8cb24cecdeb9917ef27dfada8d9f70 to your computer and use it in GitHub Desktop.
package chaser.hadoop;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.TreeSet;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class UnionFind extends Configured implements Tool {
private enum Counter{
OPEN,
DISJOINT
}
private static final Text DISJOINT = new Text("D");
private static final Text OPEN = new Text("O");
/**
* For potentially overlapping sets, elect a representative.
*
* Emits <R, {Nodes}> and <N, {R}> for each N in Nodes.
* Ignores known DISJOINT sets.
*/
public static class ElectMap extends Mapper<Text, TextArrayWritable, Text, TextArrayWritable> {
@Override
protected void map(Text key, TextArrayWritable value, Context context)
throws IOException,InterruptedException {
// If it was a disjoint output from the last iteration, then don't
// continue to propogate it.
if( key.equals(DISJOINT) ) {
context.getCounter(Counter.DISJOINT).increment(1);
return;
}
context.getCounter(Counter.OPEN).increment(1);
// Use a tree set so it's easier to find the smallest while uniquifying
TreeSet<Text> distinct = new TreeSet<Text>( Arrays.asList(value.get()) );
TextArrayWritable all = new TextArrayWritable( distinct );
Text representative = distinct.pollFirst();
TextArrayWritable representative_val = new TextArrayWritable( representative );
context.write(representative, all);
for( Text other : distinct )
context.write(other, representative_val);
}
}
/**
* Emits the union of all incoming array writables for a key.
*/
public static class ElectReduce extends Reducer<Text, TextArrayWritable, Text, TextArrayWritable> {
@Override
protected void reduce(Text key, Iterable<TextArrayWritable> values, Context context)
throws IOException, InterruptedException {
TreeSet<Text> union = new TreeSet<Text>();
for( TextArrayWritable value : values ) {
union.addAll( Arrays.asList(value.get()) );
}
context.write(key, new TextArrayWritable(union) );
}
}
/**
* Performs representative pass throughs or constituent swaps.
*/
public static class PartitionMap extends Mapper<Text, TextArrayWritable, Text, TextArrayWritable> {
@Override
protected void map(Text key, TextArrayWritable value, Context context) throws IOException ,InterruptedException {
// Constituent Swap
if( value.get().length == 1 )
context.write( value.get()[0], new TextArrayWritable(key) );
// Representative pass through
else
context.write( key, value );
}
}
/**
* Count the number of constituents, and label the set as DISJOINT if each element appears twice.
*/
public static class PartitionReduce extends Reducer<Text, TextArrayWritable, Text, Text> {
@Override
protected void reduce(Text key, Iterable<TextArrayWritable> values, Context context) throws IOException ,InterruptedException {
HashMap<Text, Integer> counts = new HashMap<Text, Integer>();
// Inject a 1 for the key, so it counts itself twice.
counts.put(key, 1);
for( TextArrayWritable value : values ) {
for( Text text : value.get() )
if( counts.containsKey(text) )
counts.put(text, counts.get(text)+1);
else
counts.put(text, 1);
}
// Assume it's DISJOINT until we see an odd man
TextArrayWritable value = new TextArrayWritable(counts.keySet());
key = DISJOINT;
for( Integer count : counts.values() ) {
if( count != 2 ) {
key = OPEN;
break;
}
}
if( key.equals(DISJOINT) )
context.getCounter(Counter.DISJOINT).increment(1);
else
context.getCounter(Counter.OPEN).increment(1);
context.write(key, value);
}
}
/**
* Simple pass that emits tags all incoming records with the OPEN key
*/
public static class MarkOpenMap extends Mapper<Writable, TextArrayWritable, Text, TextArrayWritable> {
@Override
protected void map(Writable key, TextArrayWritable value, Context context) throws IOException ,InterruptedException {
context.write( OPEN, value );
}
}
/**
* Simple pass that emits all DISJOINT records
*/
public static class EmitDisjointMap extends Mapper<Text, TextArrayWritable, Text, TextArrayWritable> {
@Override
protected void map(Text key, TextArrayWritable value, Context context) throws IOException ,InterruptedException {
if( key.equals(DISJOINT) )
context.write( key, value );
}
}
private String makeTempSpace() {
String temporary = "/tmp/union_find/" + UUID.randomUUID();
Path temp_path = new Path(temporary);
FileSystem fs = temp_path.getFileSystem(getConf());
fs.mkdirs(temp_path);
fs.deleteOnExit(temp_path);
return temporary;
}
@Override
public int run(String[] args) throws Exception {
// Create a temporary work location that gets cleaned up on exit.
String temporary = makeTempSpace();
String elect_path = temporary + "/elect.";
String partition_path = temporary + "/partition.";
int iteration = 0;
// This step assumes some prior data setup. Specifically, the input
// must be in a sequence file of <K, TextArrayWritable>.
// If IO is very important, the job could be optimized away by tacking the
// mapper onto the first iteration of the loop below with a ChainMapper.
Job setup = new Job(getConf());
setup.setJarByClass(getClass());
setup.setName("Union Find (setup)");
setup.setMapperClass(MarkOpenMap.class);
setup.setOutputDir( partition_path + iteration );
setup.setNumReduceTasks(0);
setup.setOutputKeyClass(Text.class);
setup.setOutputValueClass(TextArrayWritable.class);
setup.waitForCompletion(false);
while( true ) {
Job elect = new Job(new Configuration(getConf()));
Job partition = new Job(new Configuration(getConf()));
elect.setJarByClass(getClass());
partition.setJarByClass(getClass());
// Stitch together paths
// partition.n => elect => elect.(n+1) => partition => partition.(n+1)
elect.setInputDir( partition_path + (iteration++) );
elect.setOutputDir( elect_path + iteration );
partition.setInputDir( elect_path + iteration );
partition.setOutputDir( partition_path + iteration );
elect.setName("Union Find (elect ["+iteration+"])" );
elect.setMapperClass(ElectMap.class);
elect.setReducerClass(ElectReduce.class);
elect.setOutputKeyClass(Text.class);
elect.setOutputValueClass(TextArrayWritable.class);
partition.setName("Union Find (partition ["+iteration+"])" );
partition.MapperClass(PartitionMap.class);
partition.setReducerClass(PartitionReduce.class);
partition.setOutputKeyClass(Text.class);
partition.setOutputValueClass(TextArrayWritable.class);
elect.waitForCompletion(false);
if( !elect.isSuccessful() )
throw new RuntimeError();
// All the sets were disjoint. No more work to do.
// Otherwise, run partition and repeat.
if( elect.getCounters().findCounter(Counter.OPEN).getValue() == 0 )
break;
else
partition.waitForCompletion(false);
}
// Collect all the disjoint values.
Job emit = new Job(getConf());
emit.setName("Union Find (emit)" );
emit.setMapperClass(EmitDisjointMap.class);
emit.setNumReduceTasks(0);
emit.setOutputKeyClass(Text.class);
emit.setOutputValueClass(TextArrayWritable.class);
emit.setInputDir(partition_path + '*');
emit.waitForCompletion(true);
return emit.isSuccessful() ? 0 : 1;
}
public static void main(String[] args) throws Exception
{
int result = ToolRunner.run(new UnionFind(), args);
System.exit(result);
}
public static class TextArrayWritable extends ArrayWritable {
public TextArrayWritable() {
super(Text.class);
}
public TextArrayWritable(Text... elements) {
super(elements);
}
public TextArrayWritable(Collection<Text> elements {
super( elements.toArray(new Text[0]);
}
public Text[] get() {
Writable[] writables = super.get();
Text[] texts = new Text[writables.length];
for(int i=0; i<writables.length; ++i)
texts[i] = (Text)writables[i];
return texts;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment