Skip to content

Instantly share code, notes, and snippets.

@marblejenka
Created January 20, 2011 18:52
Show Gist options
  • Save marblejenka/788375 to your computer and use it in GitHub Desktop.
Save marblejenka/788375 to your computer and use it in GitHub Desktop.
package slim3.controller.wordcount;
import java.io.PrintWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.slim3.controller.Controller;
import org.slim3.controller.Navigation;
import slim3.mapreduce.wordcount.WordCountMapper;
import slim3.mapreduce.wordcount.WordCountReducer;
import slim3.mapreduce.wordcount.writables.WordCountIntermediateKey;
import slim3.mapreduce.wordcount.writables.WordCountIntermediateValue;
import com.google.appengine.mabreaker.MapreduceRunnerUtils;
import com.google.appengine.tools.mapreduce.ConfigurationXmlUtil;
import com.google.appengine.tools.mapreduce.DatastoreInputFormat;
public class RunController extends Controller {
@Override
public Navigation run() throws Exception {
// 設定が楽だけど、よけいな設定もついてきてしまう
// Job job = new Job(new Configuration(false));
// job.setInputFormatClass(DatastoreInputFormat.class);
// job.getConfiguration().set(
// DatastoreInputFormat.ENTITY_KIND_KEY,
// "PBFVotes");
//
// job.setMapperClass(WordCountMapper.class);
// job.setMapOutputKeyClass(WordCountIntermediateKey.class);
// job.setMapOutputValueClass(WordCountIntermediateValue.class);
//
// job.setReducerClass(WordCountReducer.class);
Configuration conf = new Configuration(false);
conf.setClass(
"mapreduce.inputformat.class",
DatastoreInputFormat.class,
InputFormat.class);
conf.set(DatastoreInputFormat.ENTITY_KIND_KEY, "PBFVotes");
conf.setClass(
"mapreduce.map.class",
WordCountMapper.class,
Mapper.class);
conf.setClass(
"mapred.mapoutput.key.class",
WordCountIntermediateKey.class,
Object.class);
conf.setClass(
"mapred.mapoutput.value.class",
WordCountIntermediateValue.class,
Object.class);
conf.setClass(
"mapreduce.reduce.class",
WordCountReducer.class,
Reducer.class);
// Render it as an HTML form so that the user can edit it.
String html =
MapreduceRunnerUtils.generateHtml(ConfigurationXmlUtil
.convertConfigurationToXml(conf));
PrintWriter pw = new PrintWriter(response.getOutputStream());
pw.println(html);
pw.close();
return forward("run.jsp");
}
}
package slim3.mapreduce.wordcount;
import slim3.mapreduce.wordcount.writables.WordCountIntermediateKey;
import slim3.mapreduce.wordcount.writables.WordCountIntermediateValue;
import com.google.appengine.api.datastore.Entity;
import com.google.appengine.api.datastore.Key;
import com.google.appengine.tools.mapreduce.AppEngineMapper;
public class WordCountMapper extends
AppEngineMapper<Key, Entity, WordCountIntermediateKey, WordCountIntermediateValue> {
WordCountIntermediateKey emitKey = new WordCountIntermediateKey();
WordCountIntermediateValue one = new WordCountIntermediateValue(1);
public void map(
Key key,
Entity value,
org.apache.hadoop.mapreduce.Mapper<Key, Entity, WordCountIntermediateKey, WordCountIntermediateValue>.Context context)
throws java.io.IOException, InterruptedException {
emitKey.set(property(value).toString());
context.write(emitKey, one);
};
String property(Entity value) {
Object property = value.getProperty("skub");
if (property == null) {
return "NULL";
}
return property.toString();
}
}
package slim3.mapreduce.wordcount.writables;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import com.google.appengine.tools.mapreduce.OutputKey;
public class WordCountIntermediateKey extends OutputKey implements
WritableComparable<WordCountIntermediateKey> {
private Text word = new Text();
public WordCountIntermediateKey() {
}
public WordCountIntermediateKey(String word) {
set(word);
}
public void set(String word) {
this.word.set(word);
}
public Text getWord() {
return word;
}
@Override
public String getKeyString() {
return word.toString();
}
@Override
public void readFromKeyString(String keyString) {
set(keyString);
}
public void write(DataOutput out) throws IOException {
word.write(out);
}
public void readFields(DataInput in) throws IOException {
word.readFields(in);
}
public int compareTo(WordCountIntermediateKey other) {
return this.word.compareTo(other.word);
}
}
package slim3.mapreduce.wordcount.writables;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
import com.google.appengine.tools.mapreduce.OutputKey;
public class WordCountIntermediateValue extends OutputKey implements
WritableComparable<WordCountIntermediateValue> {
private long count = 0;
public WordCountIntermediateValue() {
}
public WordCountIntermediateValue(long count) {
set(count);
}
public void set(long count) {
this.count = count;
}
public long getCount() {
return count;
}
@Override
public String getKeyString() {
return String.valueOf(count);
}
@Override
public void readFromKeyString(String keyString) {
count = Long.parseLong(keyString);
}
public void write(DataOutput out) throws IOException {
out.writeLong(count);
}
public void readFields(DataInput in) throws IOException {
count = in.readLong();
}
public int compareTo(WordCountIntermediateValue other) {
return Long.valueOf(this.count).compareTo(Long.valueOf(other.count));
}
}
package slim3.mapreduce.wordcount;
import org.apache.hadoop.io.LongWritable;
import slim3.mapreduce.wordcount.writables.WordCountIntermediateKey;
import slim3.mapreduce.wordcount.writables.WordCountIntermediateValue;
import com.google.appengine.api.datastore.Key;
import com.google.appengine.tools.mapreduce.AppEngineReducer;
public class WordCountReducer
extends
AppEngineReducer<WordCountIntermediateKey, WordCountIntermediateValue, Key, LongWritable> {
final String kind = "WORD_COUNT";
LongWritable emitValue = new LongWritable();
public void reduce(
WordCountIntermediateKey key,
java.lang.Iterable<WordCountIntermediateValue> values,
org.apache.hadoop.mapreduce.Reducer<WordCountIntermediateKey, WordCountIntermediateValue, Key, LongWritable>.Context context)
throws java.io.IOException, InterruptedException {
long sum = 0;
for (WordCountIntermediateValue value : values) {
sum += value.getCount();
}
emitValue.set(sum);
System.out.println(key.getKeyString() + "\t" + sum);
// 通らない
// context.getCounter(kind, key.getKeyString()).increment(sum);
// 通らない っていうかoutput formatを指定できる気配がない
// context.write(KeyFactory.createKey(kind, key.getKeyString()),
// emitValue);
};
}
情報: Initialized DatastoreInputSplit KeyedValueList("129554911992739c24a1f02c34141a0cc556bd0267f50_Anti_0") KeyedValueList("129554911992739c24a1f02c34141a0cc556bd0267f50_B_0")
Anti 200
2011/01/20 18:45:30 com.google.apphosting.utils.jetty.AppEngineAuthentication$AppEngineAuthenticator authenticate
情報: Returning NOBODY because of SkipAdminCheck.
2011/01/20 18:45:30 com.google.appengine.tools.mapreduce.DatastoreInputSplit readFields
情報: Initialized DatastoreInputSplit KeyedValueList("129554911992739c24a1f02c34141a0cc556bd0267f50_F_0") KeyedValueList("129554911992739c24a1f02c34141a0cc556bd0267f50_H_0")
2011/01/20 18:45:30 com.google.apphosting.utils.jetty.AppEngineAuthentication$AppEngineAuthenticator authenticate
情報: Returning NOBODY because of SkipAdminCheck.
2011/01/20 18:45:30 com.google.appengine.tools.mapreduce.DatastoreInputSplit readFields
情報: Initialized DatastoreInputSplit KeyedValueList("129554911992739c24a1f02c34141a0cc556bd0267f50_J_0") KeyedValueList("129554911992739c24a1f02c34141a0cc556bd0267f50_L_0")
2011/01/20 18:45:30 com.google.appengine.tools.mapreduce.DatastoreInputSplit readFields
情報: Initialized DatastoreInputSplit KeyedValueList("129554911992739c24a1f02c34141a0cc556bd0267f50_L_0") KeyedValueList("129554911992739c24a1f02c34141a0cc556bd0267f50_N_0")
2011/01/20 18:45:30 com.google.apphosting.utils.jetty.AppEngineAuthentication$AppEngineAuthenticator authenticate
情報: Returning NOBODY because of SkipAdminCheck.
NULL 400
2011/01/20 18:45:30 com.google.apphosting.utils.jetty.AppEngineAuthentication$AppEngineAuthenticator authenticate
情報: Returning NOBODY because of SkipAdminCheck.
2011/01/20 18:45:30 com.google.appengine.tools.mapreduce.DatastoreInputSplit readFields
情報: Initialized DatastoreInputSplit KeyedValueList("129554911992739c24a1f02c34141a0cc556bd0267f50_N_0") KeyedValueList("129554911992739c24a1f02c34141a0cc556bd0267f50z")
Pro 200
2011/01/20 18:45:30 com.google.apphosting.utils.jetty.AppEngineAuthentication$AppEngineAuthenticator authenticate
情報: Returning NOBODY because of SkipAdminCheck.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment