Skip to content

Instantly share code, notes, and snippets.

@hishidama
Created March 26, 2012 12:52
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hishidama/2204853 to your computer and use it in GitHub Desktop.
Save hishidama/2204853 to your computer and use it in GitHub Desktop.
Hadoopで1から100までの合計を算出するプログラム
package example;
import java.io.IOException;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* <a href="http://togetter.com/li/278731">1から100までの和を表示するプログラムを作れ</a>
*/
public class Sum100 extends Configured implements Tool {
public static void main(String[] args) throws Exception {
int r = ToolRunner.run(new Sum100(), args);
System.exit(r);
}
@Override
public int run(String[] args) throws Exception {
Path input = new Path("sum100/input");
Path output = new Path("sum100/output");
init(input);
FileSystem fs = output.getFileSystem(getConf());
try {
fs.delete(output, true);
return submit(fs, input, output);
} finally {
fs.close();
}
}
void init(Path input) throws IOException {
FileSystem fs = input.getFileSystem(getConf());
try {
SequenceFile.Writer writer = SequenceFile.createWriter(fs,
getConf(), input, NullWritable.class, IntWritable.class,
CompressionType.NONE);
try {
NullWritable key = NullWritable.get();
IntWritable val = new IntWritable();
for (int x = 1; x <= 100; x++) {
val.set(x);
writer.append(key, val);
}
} finally {
writer.close();
}
} finally {
fs.close();
}
}
int submit(FileSystem fs, Path input, Path output) throws IOException,
InterruptedException, ClassNotFoundException {
Job job = new Job(getConf(), "sum100");
job.setJarByClass(Sum100.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(Mapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
FileInputFormat.setInputPaths(job, input);
FileOutputFormat.setOutputPath(job, output);
boolean succeeded = job.waitForCompletion(true);
if (succeeded) {
print(fs, output);
return 0;
} else {
return 1;
}
}
void print(FileSystem fs, Path output) throws IOException {
PathFilter filter = new PathFilter() {
@Override
public boolean accept(Path path) {
return path.getName().startsWith("part-");
}
};
for (FileStatus s : fs.listStatus(output, filter)) {
SequenceFile.Reader reader = new SequenceFile.Reader(fs,
s.getPath(), getConf());
try {
NullWritable key = NullWritable.get();
IntWritable val = new IntWritable();
while (reader.next(key, val)) {
System.out.println(val.get());
}
} finally {
reader.close();
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment