-
-
Save catap/3aabd33b86c0fc5620aa to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
1,a | |
1,a | |
2,f | |
1,b | |
2,a | |
2,d | |
1,c | |
1,b | |
2,d | |
1,z | |
2,e | |
2,e |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
1 aabbcz | |
2 addeef |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.google.common.base.Joiner; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.io.WritableComparable; | |
import org.apache.hadoop.io.WritableComparator; | |
import org.apache.hadoop.io.WritableUtils; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.Mapper; | |
import org.apache.hadoop.mapreduce.Reducer; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; | |
import org.apache.hadoop.util.ToolRunner; | |
import org.apache.mahout.common.AbstractJob; | |
import org.apache.mahout.common.HadoopUtil; | |
import java.io.DataInput; | |
import java.io.DataOutput; | |
import java.io.IOException; | |
public class SortValueDriver extends AbstractJob { | |
static void run(Configuration conf, Path input, Path output, boolean verbose) throws IOException, InterruptedException, ClassNotFoundException { | |
HadoopUtil.delete(conf, output); | |
Job job = new Job(conf, "Sorting by values and concatenating it over input: " + input); | |
job.setJarByClass(SortValueDriver.class); | |
job.setInputFormatClass(TextInputFormat.class); | |
job.setOutputFormatClass(TextOutputFormat.class); | |
FileInputFormat.addInputPath(job, input); | |
FileOutputFormat.setOutputPath(job, output); | |
job.setMapOutputKeyClass(SortValueKV.class); | |
job.setMapOutputValueClass(Text.class); | |
job.setOutputKeyClass(Text.class); | |
job.setOutputValueClass(Text.class); | |
job.setMapperClass(SortValueMapper.class); | |
job.setReducerClass(SortValueReducer.class); | |
job.setSortComparatorClass(SortValueComparator.class); | |
job.setGroupingComparatorClass(SortValueGrouping.class); | |
job.waitForCompletion(verbose); | |
} | |
public static class SortValueKV implements WritableComparable<SortValueKV> { | |
private Text key = new Text(); | |
private Text value = new Text(); | |
public Text getKey() { | |
return key; | |
} | |
public Text getValue() { | |
return value; | |
} | |
@Override | |
public void write(DataOutput out) throws IOException { | |
WritableUtils.writeString(out, key.toString()); | |
WritableUtils.writeString(out, value.toString()); | |
} | |
@Override | |
public void readFields(DataInput in) throws IOException { | |
key.set(WritableUtils.readString(in)); | |
value.set(WritableUtils.readString(in)); | |
} | |
@Override | |
public int compareTo(SortValueKV sortValueKV) { | |
return key.compareTo(sortValueKV.getKey()); | |
} | |
@Override | |
public int hashCode() { | |
return key.hashCode(); | |
} | |
} | |
public static class SortValueComparator extends WritableComparator { | |
protected SortValueComparator() { | |
super(SortValueKV.class, true); | |
} | |
@Override | |
public int compare(WritableComparable w1, WritableComparable w2) { | |
SortValueKV kv1 = (SortValueKV) w1; | |
SortValueKV kv2 = (SortValueKV) w2; | |
int res = kv1.compareTo(kv2); | |
if (res != 0) { | |
return res; | |
} | |
return kv1.getValue().compareTo(kv2.getValue()); | |
} | |
} | |
public static class SortValueGrouping extends WritableComparator { | |
protected SortValueGrouping() { | |
super(SortValueKV.class, true); | |
} | |
@Override | |
public int compare(WritableComparable w1, WritableComparable w2) { | |
SortValueKV kv1 = (SortValueKV) w1; | |
SortValueKV kv2 = (SortValueKV) w2; | |
return kv1.compareTo(kv2); | |
} | |
} | |
static class SortValueMapper extends Mapper<LongWritable, Text, SortValueKV, Text> { | |
private SortValueKV kv = new SortValueKV(); | |
private Text value = new Text(); | |
@Override | |
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { | |
int sep = value.find(","); | |
kv.getKey().set(value.getBytes(), 0, sep); | |
// skip broken lines | |
if (sep < value.getLength()) { | |
kv.getValue().set(value.getBytes(), sep + 1, value.getLength() - (sep + 1)); | |
this.value.set(kv.getValue()); | |
context.write(kv, this.value); | |
} | |
} | |
} | |
static class SortValueReducer extends Reducer<SortValueKV, Text, Text, Text> { | |
private Joiner joiner; | |
private Text value = new Text(); | |
@Override | |
protected void setup(Context context) throws IOException, InterruptedException { | |
joiner = Joiner.on(""); | |
} | |
@Override | |
protected void reduce(SortValueKV key, Iterable<Text> values, Context context) throws IOException, InterruptedException { | |
this.value.set(joiner.join(values)); | |
context.write(key.getKey(), this.value); | |
} | |
} | |
@Override | |
public int run(String[] args) throws Exception { | |
addInputOption(); | |
addOutputOption(); | |
addFlag("verbose", "v", "verbose output"); | |
if (parseArguments(args) == null) { | |
return -1; | |
} | |
run(getConf(), getInputPath(), getOutputPath(), hasOption("verbose")); | |
return 0; | |
} | |
public static void main(String[] args) throws Exception { | |
ToolRunner.run(new Configuration(), new SortValueDriver(), args); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment