Skip to content

Instantly share code, notes, and snippets.

@hishidama
Created March 27, 2012 13:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hishidama/2215855 to your computer and use it in GitHub Desktop.
Save hishidama/2215855 to your computer and use it in GitHub Desktop.
HadoopでCounterを使って1から100までの合計を算出するプログラム
package example;
import java.io.IOException;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* <a href="http://togetter.com/li/278731">1から100までの和を表示するプログラムを作れ</a>.
* <p>
* Map出力件数のCounterを使う方式
* </p>
*/
public class Sum100Counter1 extends Configured implements Tool {
public static void main(String[] args) throws Exception {
int r = ToolRunner.run(new Sum100Counter1(), args);
System.exit(r);
}
@Override
public int run(String[] args) throws Exception {
Path input = new Path("sum100/input");
Path output = new Path("sum100/output");
init(input);
FileSystem fs = output.getFileSystem(getConf());
try {
fs.delete(output, true);
return submit(fs, input, output);
} finally {
fs.close();
}
}
void init(Path input) throws IOException {
FileSystem fs = input.getFileSystem(getConf());
try {
SequenceFile.Writer writer = SequenceFile.createWriter(fs,
getConf(), input, NullWritable.class, IntWritable.class,
CompressionType.NONE);
try {
NullWritable key = NullWritable.get();
IntWritable val = new IntWritable();
for (int x = 1; x <= 100; x++) {
val.set(x);
writer.append(key, val);
}
} finally {
writer.close();
}
} finally {
fs.close();
}
}
int submit(FileSystem fs, Path input, Path output) throws IOException,
InterruptedException, ClassNotFoundException {
Job job = new Job(getConf(), "sum100");
job.setJarByClass(getClass());
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(Map.class);
job.setNumReduceTasks(0);
job.setInputFormatClass(SequenceFileInputFormat.class);
FileInputFormat.setInputPaths(job, input);
FileOutputFormat.setOutputPath(job, output);
boolean succeeded = job.waitForCompletion(true);
if (succeeded) {
print(job);
return 0;
} else {
return 1;
}
}
public static class Map extends
Mapper<NullWritable, IntWritable, NullWritable, IntWritable> {
@Override
protected void map(NullWritable key, IntWritable value, Context context)
throws IOException, InterruptedException {
for (int i = value.get(); i > 0; i--) {
context.write(key, value);
}
}
}
void print(Job job) throws IOException {
Counters counters = job.getCounters();
Counter c = counters.findCounter(Task.Counter.MAP_OUTPUT_RECORDS);
System.out.println(c.getValue());
}
}
package example;
import java.io.IOException;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* <a href="http://togetter.com/li/278731">1から100までの和を表示するプログラムを作れ</a>.
* <p>
* 独自Counterを使う方式
* </p>
*/
public class Sum100Counter2 extends Configured implements Tool {
public static enum MyCounter {
SUM
}
public static void main(String[] args) throws Exception {
int r = ToolRunner.run(new Sum100Counter2(), args);
System.exit(r);
}
@Override
public int run(String[] args) throws Exception {
Path input = new Path("sum100/input");
Path output = new Path("sum100/output");
init(input);
FileSystem fs = output.getFileSystem(getConf());
try {
fs.delete(output, true);
return submit(fs, input, output);
} finally {
fs.close();
}
}
void init(Path input) throws IOException {
FileSystem fs = input.getFileSystem(getConf());
try {
SequenceFile.Writer writer = SequenceFile.createWriter(fs,
getConf(), input, NullWritable.class, IntWritable.class,
CompressionType.NONE);
try {
NullWritable key = NullWritable.get();
IntWritable val = new IntWritable();
for (int x = 1; x <= 100; x++) {
val.set(x);
writer.append(key, val);
}
} finally {
writer.close();
}
} finally {
fs.close();
}
}
int submit(FileSystem fs, Path input, Path output) throws IOException,
InterruptedException, ClassNotFoundException {
Job job = new Job(getConf(), "sum100");
job.setJarByClass(getClass());
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(Map.class);
job.setNumReduceTasks(0);
job.setInputFormatClass(SequenceFileInputFormat.class);
FileInputFormat.setInputPaths(job, input);
FileOutputFormat.setOutputPath(job, output);
boolean succeeded = job.waitForCompletion(true);
if (succeeded) {
print(job);
return 0;
} else {
return 1;
}
}
public static class Map extends
Mapper<NullWritable, IntWritable, NullWritable, IntWritable> {
protected Counter counter;
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
counter = context.getCounter(MyCounter.SUM);
}
@Override
protected void map(NullWritable key, IntWritable value, Context context)
throws IOException, InterruptedException {
counter.increment(value.get());
}
}
void print(Job job) throws IOException {
Counters counters = job.getCounters();
Counter c = counters.findCounter(MyCounter.SUM);
System.out.println(c.getValue());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment