Skip to content

Instantly share code, notes, and snippets.

@pasdoy
Last active November 10, 2020 04:06
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pasdoy/241720d0a2f68cc711820ba6f3ba5633 to your computer and use it in GitHub Desktop.
Save pasdoy/241720d0a2f68cc711820ba6f3ba5633 to your computer and use it in GitHub Desktop.
from mrjob.job import MRJob, MRStep
from urlparse import urlparse
from reppy.robots import Robots
import ujson
import sqlite3
import pika
class MRNewLinks(MRJob):
def configure_args(self):
super(MRNewLinks, self).configure_args()
self.add_file_arg('--database')
self.add_passthru_arg('--mq')
def steps(self):
return [
MRStep(mapper=self.mapper_seen_links,
combiner=self.combiner_seen_links,
reducer=self.reducer_seen_links),
MRStep(reducer_init=self.reducer_banned_links_init,
reducer=self.reducer_banned_links,
reducer_final=self.reducer_banned_links_final),
]
def mapper_seen_links(self, _, line):
#try:
j = ujson.loads(line)
if j['msg'] == 'CrawledURL':
yield j['CleanedLink'], 'new_link'
if j['msg'] == 'SeenURL':
if j['HTTPStatusCode'] / 100 == 3:
yield j['ResultURL'], 'new_link'
elif j['OriginalURL'] != j['ResultURL']:
yield j['ResultURL'], 'seen_link'
yield j['OriginalURL'], 'seen_link'
#except:
# pass
def combiner_seen_links(self, link, statuses):
'''avoids sending too much the same info'''
seen_link = False
new_link = False
for status in statuses:
if not seen_link and status == "seen_link":
yield link, 'seen_link'
seen_link = True
if not new_link and status == "new_link":
yield link, 'new_link'
new_link = True
def reducer_seen_links(self, link, statuses):
if any(status == "seen_link" for status in statuses):
return
o = urlparse(link)
if not o.hostname or not o.scheme:
return
yield o.scheme + '://' + o.hostname, link
sql = None
mq = None
def reducer_banned_links_init(self):
self.sql = sqlite3.connect(self.options.database or '/Users/curbside1/lambda-go/mrjob/domains.db')
self.sql.text_factory = str
host = self.options.mq or 'localhost'
self.mq = pika.BlockingConnection(pika.ConnectionParameters(host=host))
self.channel = self.mq.channel()
def reducer_banned_links(self, hostname, links):
cur = self.sql.cursor()
cur.execute("select robot, banned from robots where domain = ?", (hostname.split('://')[-1],))
row = cur.fetchone()
cur.close()
if row and row[1]:
return
if not row:
self.channel.basic_publish(exchange='',
routing_key='torobots',
body=hostname)
return
robots = Robots.parse(hostname, row[0])
agent = robots.agent('Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)')
for link in links:
if agent.allowed(link):
self.channel.basic_publish(exchange='',
routing_key='tocrawl',
body=link,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))
def reducer_banned_links_final(self):
self.channel.close()
self.mq.close()
self.sql.close()
if __name__ == '__main__':
MRNewLinks.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment