Skip to content

Instantly share code, notes, and snippets.

@whatvn
Last active August 29, 2015 14:05
Show Gist options
  • Save whatvn/7050fefa737553d39248 to your computer and use it in GitHub Desktop.
Save whatvn/7050fefa737553d39248 to your computer and use it in GitHub Desktop.
public void process(String topic, ConsumerConnector consumer) {
Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
try {
MessageAndMetadata<byte[], byte[]> next = it.next();
String seedUrl = new String(next.message());
_logger.info("Processing url [{}]", seedUrl);
Random random = new Random();
String batchId = String.valueOf(random.nextInt());
Configuration nutchConfiguration = NutchConfiguration.create();
nutchConfiguration.set(BATCH_ID, batchId);
String solrUrl = nutchConfiguration.get(SOLR_URL);
String crawlArgs = String.format("-seedurl %s -depth 5 -topN 10", seedUrl);
// Run Crawl tool
ToolRunner.run(nutchConfiguration, new Crawler(),
tokenize(crawlArgs));
SolrIndexerJob solrIndexerJob = new SolrIndexerJob();
solrIndexerJob.setConf(nutchConfiguration);
solrIndexerJob.indexSolr(solrUrl, batchId);
} catch (Exception ex) {
java.util.logging.Logger.getLogger(ABCrawler.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment