Skip to content

Instantly share code, notes, and snippets.

@uzresk
Last active August 29, 2015 14:09
Show Gist options
  • Save uzresk/81f7dab8bfda6b49dfaf to your computer and use it in GitHub Desktop.
Save uzresk/81f7dab8bfda6b49dfaf to your computer and use it in GitHub Desktop.
AWS EMRを動かしてみよう。 ref: http://qiita.com/uzresk/items/76ba0c9700e1d78fe5e3
package aws.emr.accesslog;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class AccessLog {
public static void main(String[] args) throws Exception {
Job job = new Job();
job.setJarByClass(AccessLog.class);
job.setJobName("AccessLog");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setMapperClass(AccessLogMapper.class);
job.setReducerClass(AccessLogReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
package aws.emr.accesslog;
import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class AccessLogMapper extends
Mapper<LongWritable, Text, Text, LongWritable> {
private final static LongWritable one = new LongWritable(1);
private Text url = new Text();
private static final Pattern PATTERN = Pattern
.compile("^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) (\\d+) \"([^\"]+)\" \"([^\"]+)\"");
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, LongWritable>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
Matcher matcher = PATTERN.matcher(line);
if (!matcher.matches()) {
System.err.println("no matches");
}
String path = matcher.group(5).split(" ")[1];
System.out.println("URL:" + path);
url.set(path);
context.write(url, one);
}
}
package aws.emr.accesslog;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import junit.framework.TestCase;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.junit.Before;
import org.junit.Test;
public class AccessLogMapperTest extends TestCase {
private Mapper<LongWritable, Text, Text, LongWritable> mapper;
private MapDriver<LongWritable, Text, Text, LongWritable> driver;
@Before
public void setUp() {
mapper = new AccessLogMapper();
driver = new MapDriver<LongWritable, Text, Text, LongWritable>(mapper);
}
@Test
public void testCount() throws IOException {
BufferedReader br = new BufferedReader(new InputStreamReader(
AccessLogMapperTest.class
.getResourceAsStream("/aws/emr/accesslog/access.log")));
String line = null;
while ((line = br.readLine()) != null) {
driver.withInput(new LongWritable(1), new Text(line));
}
driver.withOutput(new Text("/"), new LongWritable(1));
driver.withOutput(new Text("/test"), new LongWritable(1));
driver.withOutput(new Text("/foo/bar"), new LongWritable(1));
driver.withOutput(new Text("/test"), new LongWritable(1));
driver.runTest();
}
}
package aws.emr.accesslog;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
import org.junit.Before;
import org.junit.Test;
public class AccessLogMapReduceTest {
private Mapper<LongWritable, Text, Text, LongWritable> mapper;
private Reducer<Text, LongWritable, Text, LongWritable> reducer;
private MapReduceDriver<LongWritable, Text, Text, LongWritable, Text, LongWritable> driver;
@Before
public void setUp() {
mapper = new AccessLogMapper();
reducer = new AccessLogReducer();
driver = new MapReduceDriver<LongWritable, Text, Text, LongWritable, Text, LongWritable>(
mapper, reducer);
}
@Test
public void testAccessLogMapReduce() throws IOException {
BufferedReader br = new BufferedReader(new InputStreamReader(
AccessLogMapperTest.class
.getResourceAsStream("/aws/emr/accesslog/access.log")));
String line = null;
while ((line = br.readLine()) != null) {
driver.withInput(new LongWritable(1), new Text(line));
}
driver.withOutput(new Text("/"), new LongWritable(1));
driver.withOutput(new Text("/foo/bar"), new LongWritable(1));
driver.withOutput(new Text("/test"), new LongWritable(2));
driver.runTest();
}
}
package aws.emr.accesslog;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class AccessLogReducer extends
Reducer<Text, LongWritable, Text, LongWritable> {
@Override
public void reduce(Text key, Iterable<LongWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (LongWritable value : values) {
sum += value.get();
}
context.write(key, new LongWritable(sum));
}
}
package aws.emr.accesslog;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.junit.Before;
import org.junit.Test;
public class AccessLogReducerTest {
private Reducer<Text, LongWritable, Text, LongWritable> reducer;
private ReduceDriver<Text, LongWritable, Text, LongWritable> driver;
@Before
public void setUp() {
reducer = new AccessLogReducer();
driver = new ReduceDriver<Text, LongWritable, Text, LongWritable>(
reducer);
}
@Test
public void testWordCountReduce() throws IOException {
// <foo : {1,1}>
List<LongWritable> values = new ArrayList<LongWritable>();
values.add(new LongWritable(1));
driver.withInput(new Text("/test"), values);
driver.withOutput(new Text("/test"), new LongWritable(1));
List<LongWritable> values2 = new ArrayList<LongWritable>();
values2.add(new LongWritable(1));
values2.add(new LongWritable(1));
driver.withInput(new Text("/foo/bar"), values2);
driver.withOutput(new Text("/foo/bar"), new LongWritable(2));
driver.runTest();
}
}
/ 9
/foo/bar 9
/test 18
<dependency>
<groupId>org.apache.mrunit</groupId>
<artifactId>mrunit</artifactId>
<version>1.0.0</version>
<classifier>hadoop1</classifier>
<scope>test</scope>
</dependency>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment