Skip to content

Instantly share code, notes, and snippets.

@ProbonoBonobo
Created March 19, 2020 19:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ProbonoBonobo/93b41547e2fca7ffea6677b3ff9b73a7 to your computer and use it in GitHub Desktop.
Save ProbonoBonobo/93b41547e2fca7ffea6677b3ff9b73a7 to your computer and use it in GitHub Desktop.
"""crawl.py
This is an exercise in using Trio to asynchronously request a website's sitemaps in a large batch, recursively
visit new sitemaps as revealed in the sitemap structure, then add discovered URLs to a database along with a
flag that indicates those URLs need to be crawled. When all sitemaps have been parsed, the crawler requests a
bunch of URLs (50 or so) at a time and parses the HTTP responses as they are received. The crawler requires a
website to use embed a JSON object that uses either the Article or NewsArticle schema.org convention; I haven't
yet found a news website that doesn't use this convention.
"""
from lxml.html import fromstring as parse_html
import unidecode
from html import unescape
from collections import deque
from bs4 import BeautifulSoup
import httpx
import psycopg2
import pytz
import trio
from urllib.parse import urlparse
import dataset
import datetime
import json
from dateutil.parser import parse as tsparse
site_to_crawl = "www.latimes.com"
directory = {"www.theonion.com": "https://www.theonion.com/sitemap.xml",
"www.nytimes.com": "https://www.nytimes.com/sitemaps/new/sitemap.xml.gz",
"www.buzzfeednews.com": "https://www.buzzfeednews.com/sitemap.xml",
"www.jezebel.com": "https://www.jezebel.com/sitemap.xml",
"entertainment.theonion.com": "https://entertainment.theonion.com/sitemap.xml",
"politics.theonion.com": "https://politics.theonion.com/sitemap.xml",
"local.theonion.com": "https://local.theonion.com/sitemap.xml",
"www.sandiegouniontribune.com": "https://www.sandiegouniontribune.com/sitemap-latest.xml",
"www.latimes.com": "https://www.latimes.com/news-sitemap-latest.xml"}
# unfortunately global variables are proving to be better than Trio's
# preferred "memory channel" objects, which occasionally exhaust the
# operating system of available file descriptor objects during longer
# crawls.
# these hold references to urls that have been visited, hence can be ignored
seen = set()
seen_sitemaps = set()
# these hold sitemap and content page data to be added to the database
sitemaps = []
entries = []
pages = {} # I believe the purpose of this was to map URLs to data for deduplication
# running this code requires a working installation of postgres
db = dataset.connect('postgresql://kz:admin@localhost:5432/kz')
conn = psycopg2.connect(user="kz", password="admin", host="127.0.0.1", port="5432", database="kz")
table = db['crawldb']
sitemap_db = db['sitemaps']
insert_queue = {"crawldb": [], "sitemaps": []}
seen_sitemaps.update([row['url'] for row in sitemap_db])
def create_crawldb_table():
if 'crawldb' in db.tables:
return
create_table_query = '''CREATE TABLE crawldb
(id SERIAL PRIMARY KEY,
site TEXT,
url TEXT NOT NULL UNIQUE,
hostname TEXT,
path TEXT,
crawled boolean NOT NULL DEFAULT FALSE,
info json,
html text,
lastcrawled date,
status_code int,
ok boolean DEFAULT FALSE,
length int DEFAULT 0)'''
with conn.cursor() as cursor:
cursor.execute(create_table_query)
conn.commit()
def create_sitemaps_table():
if 'sitemaps' in db.tables:
return
create_table_query = ("CREATE TABLE sitemaps\n"
" (id SERIAL PRIMARY KEY,\n"
" site TEXT,\n"
" url TEXT NOT NULL UNIQUE,\n"
" last_modified date DEFAULT CURRENT_DATE,\n"
" has_timestamp boolean,\n"
" needs_crawl boolean DEFAULT TRUE,\n"
" last_crawled date,\n"
" is_fresh boolean default TRUE,\n"
" fresh_content_urls int,\n"
" fresh_sitemap_urls int,\n"
" fresh_urls int,\n"
" is_sitemapindex boolean,\n"
" is_urlset boolean,\n"
" type text)")
with conn.cursor() as cursor:
cursor.execute(create_table_query)
conn.commit()
async def fetch_sitemaps(index_url):
site = ".".join(urlparse(index_url).hostname.split(".")[-2:])
async with httpx.AsyncClient(timeout=60) as client:
res = await client.get(index_url, timeout=60, headers={
"user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.130 Safari/537.36",
"dnt": "1", "cookie": "nyt-a=29482ninwfwe_efw;"})
soup = BeautifulSoup(res.content, 'xml')
data = soup.find_all("sitemap")
urls = soup.find_all("url")
urlset = soup.find("urlset")
sitemapindex = soup.find("sitemapindex")
has_urls = bool(urlset)
has_sitemaps = bool(sitemapindex)
doctype = "sitemapindex" if has_sitemaps else "urlset" if has_urls else None
all_content_urls = set([node.text for node in urlset.find_all("loc") if site in node.text]) if urlset else set()
all_sitemap_urls = set([node.text for node in sitemapindex.find_all("loc") if sitemapindex]) if sitemapindex else set()
seen_content_urls = set([row['url'] for row in table.find(site=site)])
seen_sitemap_urls = set([row['url'] for row in sitemap_db.find(site=site)])
fresh_content_urls = all_content_urls.difference(seen_content_urls)
fresh_sitemap_urls = all_sitemap_urls.difference(seen_sitemap_urls)
n_fresh_content = len(list(fresh_content_urls))
n_fresh_sitemaps = len(list(fresh_sitemap_urls))
is_fresh = any([n_fresh_content, n_fresh_sitemaps])
row = {"url": index_url,
"crawled": True,
"needs_crawl": False,
"last_crawled": datetime.date.today(),
"is_fresh": is_fresh,
"fresh_content_urls": n_fresh_content,
"fresh_sitemap_urls": n_fresh_sitemaps,
"fresh_urls": n_fresh_content + n_fresh_sitemaps,
"is_sitemapindex": has_sitemaps,
"is_urlset": has_urls,
"type": doctype}
if is_fresh and 'has_timestamp' in row and not row['has_timestamp']:
row['last_modified'] = datetime.date.today()
sitemap_db.upsert(row, ['url'])
if not is_fresh:
return
for sitemap in data:
sitemap_url = sitemap.find("loc").text
try:
last_modified = sitemap.find("lastmod").text
except:
last_modified = datetime.datetime.now().strftime("%x")
row = sitemap_db.find_one(url=sitemap_url)
exists = bool(row)
last_crawled = row['last_crawled'] if exists and row['last_crawled'] else datetime.datetime(1,1,1).date()
last_modified_dt = tsparse(last_modified).date() if last_modified else datetime.date.today()
has_timestamp = bool(last_modified)
try:
needs_crawl = last_modified_dt >= last_crawled
except Exception as e:
print(f"{e.__class__.__name__} :: {e} \n last_modified_dt = {last_modified_dt}\n last_crawled = {last_crawled}")
breakpoint()
if not exists:
row = {"url": sitemap_url,
"site": urlparse(sitemap_url).netloc,
"crawled": False,
"has_timestamp": has_timestamp,
"last_modified": last_modified_dt,
"needs_crawl": needs_crawl}
sitemap_db.insert(row)
print(json.dumps(row, indent=4, default=str))
else:
d = datetime.datetime.now()
timezone = pytz.timezone("America/Los_Angeles")
d_aware = timezone.localize(d)
row = {"url": sitemap_url, "needs_crawl": needs_crawl}
sitemap_db.update(row, ["url"])
for entry in urls:
url = entry.find("loc").text
entry = table.find_one(url=url)
if entry:
print(f"{url} already in database:\n {json.dumps(entry, indent=4, default=str)}")
continue
else:
print(f"Adding {url} to database")
parsed = urlparse(url)
site = '.'.join(parsed.netloc.split(".")[-2:])
row = {"url": url, "site": site, "hostname": parsed.netloc, "path": parsed.path}
table.upsert(row, ['url'])
create_crawldb_table()
create_sitemaps_table()
async def crawl_sitemaps(site):
index_url = directory[site]
parsed = urlparse(index_url)
row = {"needs_crawl": True, "site": site, "url": index_url}
sitemap_db.upsert(row, ['url'])
n = 0
while True:
queue = deque(sitemap_db.find(needs_crawl=True, site=site))
print(f"{len(queue)} urls in the queue.")
for i, url in enumerate(queue):
print(f"{i} :: {url['url']}")
if any(queue):
while queue:
n_requests = min(len(queue), 10)
before = datetime.datetime.now()
async with trio.open_nursery() as nursery:
for i in range(n_requests):
row = queue.pop()
url = row['url']
nursery.start_soon(fetch_sitemaps, url)
else:
print(f"Finished crawling sitemaps.")
break
if n == 0:
sitemap_db.upsert({"url": index_url, "needs_crawl": False}, ["url"])
n += 1
async def crawl_content_page(url):
keymap = {"Article": {"url": [('url',)],
"article": [('articleBody',)],
"description": [('description',)],
"image": [('image', 'url')],
"headline": [('headline',)],
"author": [('author', 'name')],
"category": [('articleSection',)],
"date": [('datePublished',)],
"keywords": [('keywords',)]}}
keymap['NewsArticle'] = keymap['Article']
async with httpx.AsyncClient(timeout=30) as client:
res = await client.get(url, timeout=30, headers={
"user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.130 Safari/537.36",
"dnt": "1", "cookie": "nyt-a=29482ninwfwe_efw;"})
dom = parse_html(res.content)
info_payload = None
for payload in dom.xpath("//script[contains(@type,'application/ld+json')]/text()"):
payload = json.loads(payload)
try:
if '@type' in payload and payload['@type'] in keymap:
info_payload = payload
ptrs = keymap[payload['@type']]
obj = {}
for col, slx in ptrs.items():
result = None
try:
for path in slx:
result = payload
try:
result = json.loads(result)
except:
pass
for selector in path:
if isinstance(result, list):
result = [x[selector] for x in result]
else:
result = result[selector]
obj[col] = result
continue
except KeyError:
continue
for k,v in obj.items():
if k == 'date':
v = tsparse(v)
if isinstance(v, list):
v = ', '.join(v)
if isinstance(v, str):
v = unidecode.unidecode(unescape(v))
obj[k] = v
obj['crawled'] = True
obj['length'] = len(res.content)
obj['info'] = info_payload
obj['status_code'] = int(res.status_code)
obj['ok'] = obj['status_code'] == 200
obj['last_crawled'] = datetime.date.today()
print(obj['headline'])
table.update(obj, ['url'])
except TypeError as e:
print(f"{e.__class__.__name__} :: {e} :: {payload}")
async def crawl_urls(site):
queue = deque([page['url'] for page in table.find(hostname=site, crawled=False) if 'coronavirus' in page['url']])
while queue:
n_requests = min(10, len(queue))
async with trio.open_nursery() as nursery:
for i in range(n_requests):
nursery.start_soon(crawl_content_page, queue.pop())
trio.run(crawl_sitemaps, site_to_crawl)
trio.run(crawl_urls, site_to_crawl)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment