Created
March 27, 2012 13:26
-
-
Save hishidama/2215855 to your computer and use it in GitHub Desktop.
HadoopでCounterを使って1から100までの合計を算出するプログラム
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package example; | |
import java.io.IOException; | |
import org.apache.hadoop.conf.Configured; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.IntWritable; | |
import org.apache.hadoop.io.NullWritable; | |
import org.apache.hadoop.io.SequenceFile; | |
import org.apache.hadoop.io.SequenceFile.CompressionType; | |
import org.apache.hadoop.mapred.Task; | |
import org.apache.hadoop.mapreduce.Counter; | |
import org.apache.hadoop.mapreduce.Counters; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.Mapper; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
import org.apache.hadoop.util.Tool; | |
import org.apache.hadoop.util.ToolRunner; | |
/** | |
* <a href="http://togetter.com/li/278731">1から100までの和を表示するプログラムを作れ</a>. | |
* <p> | |
* Map出力件数のCounterを使う方式 | |
* </p> | |
*/ | |
public class Sum100Counter1 extends Configured implements Tool { | |
public static void main(String[] args) throws Exception { | |
int r = ToolRunner.run(new Sum100Counter1(), args); | |
System.exit(r); | |
} | |
@Override | |
public int run(String[] args) throws Exception { | |
Path input = new Path("sum100/input"); | |
Path output = new Path("sum100/output"); | |
init(input); | |
FileSystem fs = output.getFileSystem(getConf()); | |
try { | |
fs.delete(output, true); | |
return submit(fs, input, output); | |
} finally { | |
fs.close(); | |
} | |
} | |
void init(Path input) throws IOException { | |
FileSystem fs = input.getFileSystem(getConf()); | |
try { | |
SequenceFile.Writer writer = SequenceFile.createWriter(fs, | |
getConf(), input, NullWritable.class, IntWritable.class, | |
CompressionType.NONE); | |
try { | |
NullWritable key = NullWritable.get(); | |
IntWritable val = new IntWritable(); | |
for (int x = 1; x <= 100; x++) { | |
val.set(x); | |
writer.append(key, val); | |
} | |
} finally { | |
writer.close(); | |
} | |
} finally { | |
fs.close(); | |
} | |
} | |
int submit(FileSystem fs, Path input, Path output) throws IOException, | |
InterruptedException, ClassNotFoundException { | |
Job job = new Job(getConf(), "sum100"); | |
job.setJarByClass(getClass()); | |
job.setOutputKeyClass(NullWritable.class); | |
job.setOutputValueClass(IntWritable.class); | |
job.setMapperClass(Map.class); | |
job.setNumReduceTasks(0); | |
job.setInputFormatClass(SequenceFileInputFormat.class); | |
FileInputFormat.setInputPaths(job, input); | |
FileOutputFormat.setOutputPath(job, output); | |
boolean succeeded = job.waitForCompletion(true); | |
if (succeeded) { | |
print(job); | |
return 0; | |
} else { | |
return 1; | |
} | |
} | |
public static class Map extends | |
Mapper<NullWritable, IntWritable, NullWritable, IntWritable> { | |
@Override | |
protected void map(NullWritable key, IntWritable value, Context context) | |
throws IOException, InterruptedException { | |
for (int i = value.get(); i > 0; i--) { | |
context.write(key, value); | |
} | |
} | |
} | |
void print(Job job) throws IOException { | |
Counters counters = job.getCounters(); | |
Counter c = counters.findCounter(Task.Counter.MAP_OUTPUT_RECORDS); | |
System.out.println(c.getValue()); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package example; | |
import java.io.IOException; | |
import org.apache.hadoop.conf.Configured; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.IntWritable; | |
import org.apache.hadoop.io.NullWritable; | |
import org.apache.hadoop.io.SequenceFile; | |
import org.apache.hadoop.io.SequenceFile.CompressionType; | |
import org.apache.hadoop.mapreduce.Counter; | |
import org.apache.hadoop.mapreduce.Counters; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.Mapper; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
import org.apache.hadoop.util.Tool; | |
import org.apache.hadoop.util.ToolRunner; | |
/** | |
* <a href="http://togetter.com/li/278731">1から100までの和を表示するプログラムを作れ</a>. | |
* <p> | |
* 独自Counterを使う方式 | |
* </p> | |
*/ | |
public class Sum100Counter2 extends Configured implements Tool { | |
public static enum MyCounter { | |
SUM | |
} | |
public static void main(String[] args) throws Exception { | |
int r = ToolRunner.run(new Sum100Counter2(), args); | |
System.exit(r); | |
} | |
@Override | |
public int run(String[] args) throws Exception { | |
Path input = new Path("sum100/input"); | |
Path output = new Path("sum100/output"); | |
init(input); | |
FileSystem fs = output.getFileSystem(getConf()); | |
try { | |
fs.delete(output, true); | |
return submit(fs, input, output); | |
} finally { | |
fs.close(); | |
} | |
} | |
void init(Path input) throws IOException { | |
FileSystem fs = input.getFileSystem(getConf()); | |
try { | |
SequenceFile.Writer writer = SequenceFile.createWriter(fs, | |
getConf(), input, NullWritable.class, IntWritable.class, | |
CompressionType.NONE); | |
try { | |
NullWritable key = NullWritable.get(); | |
IntWritable val = new IntWritable(); | |
for (int x = 1; x <= 100; x++) { | |
val.set(x); | |
writer.append(key, val); | |
} | |
} finally { | |
writer.close(); | |
} | |
} finally { | |
fs.close(); | |
} | |
} | |
int submit(FileSystem fs, Path input, Path output) throws IOException, | |
InterruptedException, ClassNotFoundException { | |
Job job = new Job(getConf(), "sum100"); | |
job.setJarByClass(getClass()); | |
job.setOutputKeyClass(NullWritable.class); | |
job.setOutputValueClass(IntWritable.class); | |
job.setMapperClass(Map.class); | |
job.setNumReduceTasks(0); | |
job.setInputFormatClass(SequenceFileInputFormat.class); | |
FileInputFormat.setInputPaths(job, input); | |
FileOutputFormat.setOutputPath(job, output); | |
boolean succeeded = job.waitForCompletion(true); | |
if (succeeded) { | |
print(job); | |
return 0; | |
} else { | |
return 1; | |
} | |
} | |
public static class Map extends | |
Mapper<NullWritable, IntWritable, NullWritable, IntWritable> { | |
protected Counter counter; | |
@Override | |
protected void setup(Context context) throws IOException, | |
InterruptedException { | |
counter = context.getCounter(MyCounter.SUM); | |
} | |
@Override | |
protected void map(NullWritable key, IntWritable value, Context context) | |
throws IOException, InterruptedException { | |
counter.increment(value.get()); | |
} | |
} | |
void print(Job job) throws IOException { | |
Counters counters = job.getCounters(); | |
Counter c = counters.findCounter(MyCounter.SUM); | |
System.out.println(c.getValue()); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment