Skip to content

Instantly share code, notes, and snippets.

@aladagemre
Created July 1, 2013 08:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aladagemre/5899199 to your computer and use it in GitHub Desktop.
Save aladagemre/5899199 to your computer and use it in GitHub Desktop.
public Map<String,Object> run(Map<String,Object> args) throws Exception {
String crawlId = (String)args.get(Nutch.ARG_CRAWL);
numJobs = 1;
currentJobNum = 0;
currentJob = new NutchJob(getConf(), "update-table");
if (crawlId != null) {
currentJob.getConfiguration().set(Nutch.CRAWL_ID_KEY, crawlId);
}
//job.setBoolean(ALL, updateAll);
ScoringFilters scoringFilters = new ScoringFilters(getConf());
HashSet<WebPage.Field> fields = new HashSet<WebPage.Field>(FIELDS);
fields.addAll(scoringFilters.getFields());
// Partition by {url}, sort by {url,score} and group by {url}.
// This ensures that the inlinks are sorted by score when they enter
// the reducer.
currentJob.setPartitionerClass(UrlOnlyPartitioner.class);
currentJob.setSortComparatorClass(UrlScoreComparator.class);
currentJob.setGroupingComparatorClass(UrlOnlyComparator.class);
StorageUtils.initMapperJob(currentJob, fields, UrlWithScore.class,
NutchWritable.class, DbUpdateMapper.class);
StorageUtils.initReducerJob(currentJob, DbUpdateReducer.class);
currentJob.waitForCompletion(true);
LOG.info("LinkRank starts...");
LinkRankJob2 linkRankJob = new LinkRankJob2();
linkRankJob.run(null);
LOG.info("LinkRank has finished...");
ToolUtil.recordJobStatus(null, currentJob, results);
return results;
}
package org.apache.nutch.crawl;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.edge.ByteArrayEdges;
import org.apache.giraph.examples.LinkRank.LinkRankComputation;
import org.apache.giraph.examples.LinkRank.LinkRankVertexMasterCompute;
import org.apache.giraph.examples.LinkRank.NutchTableEdgeInputFormat;
import org.apache.giraph.examples.LinkRank.NutchTableEdgeOutputFormat;
import org.apache.giraph.job.GiraphJob;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.util.Tool;
import org.apache.log4j.Logger;
public class LinkRankJob2 implements Tool {
private static final Logger LOG = Logger.getLogger(LinkRankJob2.class);
private GiraphConfiguration conf;
private static final String TABLE_NAME = "webpage";
@Override
public int run(String[] strings) throws Exception {
Configuration config = HBaseConfiguration.create();
config.clear();
config.set("hbase.zookeeper.quorum", "localhost");
config.set("hbase.zookeeper.property.clientPort", "2181");
config.set("hbase.master", "localhost:60000");
config.set("mapred.job.tracker", "localhost:9001");
HBaseAdmin admin = new HBaseAdmin(config);
ZooKeeperWatcher zooKeeperWatcher = new ZooKeeperWatcher(config, "zkw", new Abortable() {
@Override
public void abort(String s, Throwable throwable) {
System.out.println(s);
}
});
admin.getMaster();
// Start the giraph job
GiraphConfiguration giraphConf = new GiraphConfiguration(config); //giraphJob.getConfiguration();
giraphConf.setZooKeeperConfiguration(
zooKeeperWatcher.getQuorum());
giraphConf.setComputationClass(LinkRankComputation.class);
giraphConf.setMasterComputeClass(LinkRankVertexMasterCompute.class);
giraphConf.setOutEdgesClass(ByteArrayEdges.class);
giraphConf.setVertexInputFormatClass(NutchTableEdgeInputFormat.class);
giraphConf.setVertexOutputFormatClass(NutchTableEdgeOutputFormat.class);
giraphConf.setInt("giraph.pageRank.superstepCount", 40);
giraphConf.setWorkerConfiguration(1, 1, 100.0f);
giraphConf.set(TableInputFormat.INPUT_TABLE, TABLE_NAME);
giraphConf.set(TableOutputFormat.OUTPUT_TABLE, TABLE_NAME);
GiraphJob giraphJob = new GiraphJob(giraphConf, "LinkRank2");
return giraphJob.run(false) ? 0: -1;
}
@Override
public void setConf(final Configuration conf) {
this.conf = new GiraphConfiguration(conf);
}
@Override
public Configuration getConf() {
return conf;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment