-
-
Save pasdoy/241720d0a2f68cc711820ba6f3ba5633 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from mrjob.job import MRJob, MRStep | |
from urlparse import urlparse | |
from reppy.robots import Robots | |
import ujson | |
import sqlite3 | |
import pika | |
class MRNewLinks(MRJob): | |
def configure_args(self): | |
super(MRNewLinks, self).configure_args() | |
self.add_file_arg('--database') | |
self.add_passthru_arg('--mq') | |
def steps(self): | |
return [ | |
MRStep(mapper=self.mapper_seen_links, | |
combiner=self.combiner_seen_links, | |
reducer=self.reducer_seen_links), | |
MRStep(reducer_init=self.reducer_banned_links_init, | |
reducer=self.reducer_banned_links, | |
reducer_final=self.reducer_banned_links_final), | |
] | |
def mapper_seen_links(self, _, line): | |
#try: | |
j = ujson.loads(line) | |
if j['msg'] == 'CrawledURL': | |
yield j['CleanedLink'], 'new_link' | |
if j['msg'] == 'SeenURL': | |
if j['HTTPStatusCode'] / 100 == 3: | |
yield j['ResultURL'], 'new_link' | |
elif j['OriginalURL'] != j['ResultURL']: | |
yield j['ResultURL'], 'seen_link' | |
yield j['OriginalURL'], 'seen_link' | |
#except: | |
# pass | |
def combiner_seen_links(self, link, statuses): | |
'''avoids sending too much the same info''' | |
seen_link = False | |
new_link = False | |
for status in statuses: | |
if not seen_link and status == "seen_link": | |
yield link, 'seen_link' | |
seen_link = True | |
if not new_link and status == "new_link": | |
yield link, 'new_link' | |
new_link = True | |
def reducer_seen_links(self, link, statuses): | |
if any(status == "seen_link" for status in statuses): | |
return | |
o = urlparse(link) | |
if not o.hostname or not o.scheme: | |
return | |
yield o.scheme + '://' + o.hostname, link | |
sql = None | |
mq = None | |
def reducer_banned_links_init(self): | |
self.sql = sqlite3.connect(self.options.database or '/Users/curbside1/lambda-go/mrjob/domains.db') | |
self.sql.text_factory = str | |
host = self.options.mq or 'localhost' | |
self.mq = pika.BlockingConnection(pika.ConnectionParameters(host=host)) | |
self.channel = self.mq.channel() | |
def reducer_banned_links(self, hostname, links): | |
cur = self.sql.cursor() | |
cur.execute("select robot, banned from robots where domain = ?", (hostname.split('://')[-1],)) | |
row = cur.fetchone() | |
cur.close() | |
if row and row[1]: | |
return | |
if not row: | |
self.channel.basic_publish(exchange='', | |
routing_key='torobots', | |
body=hostname) | |
return | |
robots = Robots.parse(hostname, row[0]) | |
agent = robots.agent('Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)') | |
for link in links: | |
if agent.allowed(link): | |
self.channel.basic_publish(exchange='', | |
routing_key='tocrawl', | |
body=link, | |
properties=pika.BasicProperties( | |
delivery_mode = 2, # make message persistent | |
)) | |
def reducer_banned_links_final(self): | |
self.channel.close() | |
self.mq.close() | |
self.sql.close() | |
if __name__ == '__main__': | |
MRNewLinks.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment