Skip to content

Instantly share code, notes, and snippets.

@catap
Created November 27, 2013 18:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save catap/3aabd33b86c0fc5620aa to your computer and use it in GitHub Desktop.
Save catap/3aabd33b86c0fc5620aa to your computer and use it in GitHub Desktop.
1,a
1,a
2,f
1,b
2,a
2,d
1,c
1,b
2,d
1,z
2,e
2,e
1 aabbcz
2 addeef
import com.google.common.base.Joiner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.common.AbstractJob;
import org.apache.mahout.common.HadoopUtil;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class SortValueDriver extends AbstractJob {
static void run(Configuration conf, Path input, Path output, boolean verbose) throws IOException, InterruptedException, ClassNotFoundException {
HadoopUtil.delete(conf, output);
Job job = new Job(conf, "Sorting by values and concatenating it over input: " + input);
job.setJarByClass(SortValueDriver.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, input);
FileOutputFormat.setOutputPath(job, output);
job.setMapOutputKeyClass(SortValueKV.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(SortValueMapper.class);
job.setReducerClass(SortValueReducer.class);
job.setSortComparatorClass(SortValueComparator.class);
job.setGroupingComparatorClass(SortValueGrouping.class);
job.waitForCompletion(verbose);
}
public static class SortValueKV implements WritableComparable<SortValueKV> {
private Text key = new Text();
private Text value = new Text();
public Text getKey() {
return key;
}
public Text getValue() {
return value;
}
@Override
public void write(DataOutput out) throws IOException {
WritableUtils.writeString(out, key.toString());
WritableUtils.writeString(out, value.toString());
}
@Override
public void readFields(DataInput in) throws IOException {
key.set(WritableUtils.readString(in));
value.set(WritableUtils.readString(in));
}
@Override
public int compareTo(SortValueKV sortValueKV) {
return key.compareTo(sortValueKV.getKey());
}
@Override
public int hashCode() {
return key.hashCode();
}
}
public static class SortValueComparator extends WritableComparator {
protected SortValueComparator() {
super(SortValueKV.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
SortValueKV kv1 = (SortValueKV) w1;
SortValueKV kv2 = (SortValueKV) w2;
int res = kv1.compareTo(kv2);
if (res != 0) {
return res;
}
return kv1.getValue().compareTo(kv2.getValue());
}
}
public static class SortValueGrouping extends WritableComparator {
protected SortValueGrouping() {
super(SortValueKV.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
SortValueKV kv1 = (SortValueKV) w1;
SortValueKV kv2 = (SortValueKV) w2;
return kv1.compareTo(kv2);
}
}
static class SortValueMapper extends Mapper<LongWritable, Text, SortValueKV, Text> {
private SortValueKV kv = new SortValueKV();
private Text value = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
int sep = value.find(",");
kv.getKey().set(value.getBytes(), 0, sep);
// skip broken lines
if (sep < value.getLength()) {
kv.getValue().set(value.getBytes(), sep + 1, value.getLength() - (sep + 1));
this.value.set(kv.getValue());
context.write(kv, this.value);
}
}
}
static class SortValueReducer extends Reducer<SortValueKV, Text, Text, Text> {
private Joiner joiner;
private Text value = new Text();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
joiner = Joiner.on("");
}
@Override
protected void reduce(SortValueKV key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
this.value.set(joiner.join(values));
context.write(key.getKey(), this.value);
}
}
@Override
public int run(String[] args) throws Exception {
addInputOption();
addOutputOption();
addFlag("verbose", "v", "verbose output");
if (parseArguments(args) == null) {
return -1;
}
run(getConf(), getInputPath(), getOutputPath(), hasOption("verbose"));
return 0;
}
public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(), new SortValueDriver(), args);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment