Created
March 19, 2020 19:35
-
-
Save ProbonoBonobo/93b41547e2fca7ffea6677b3ff9b73a7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""crawl.py | |
This is an exercise in using Trio to asynchronously request a website's sitemaps in a large batch, recursively | |
visit new sitemaps as revealed in the sitemap structure, then add discovered URLs to a database along with a | |
flag that indicates those URLs need to be crawled. When all sitemaps have been parsed, the crawler requests a | |
bunch of URLs (50 or so) at a time and parses the HTTP responses as they are received. The crawler requires a | |
website to use embed a JSON object that uses either the Article or NewsArticle schema.org convention; I haven't | |
yet found a news website that doesn't use this convention. | |
""" | |
from lxml.html import fromstring as parse_html | |
import unidecode | |
from html import unescape | |
from collections import deque | |
from bs4 import BeautifulSoup | |
import httpx | |
import psycopg2 | |
import pytz | |
import trio | |
from urllib.parse import urlparse | |
import dataset | |
import datetime | |
import json | |
from dateutil.parser import parse as tsparse | |
site_to_crawl = "www.latimes.com" | |
directory = {"www.theonion.com": "https://www.theonion.com/sitemap.xml", | |
"www.nytimes.com": "https://www.nytimes.com/sitemaps/new/sitemap.xml.gz", | |
"www.buzzfeednews.com": "https://www.buzzfeednews.com/sitemap.xml", | |
"www.jezebel.com": "https://www.jezebel.com/sitemap.xml", | |
"entertainment.theonion.com": "https://entertainment.theonion.com/sitemap.xml", | |
"politics.theonion.com": "https://politics.theonion.com/sitemap.xml", | |
"local.theonion.com": "https://local.theonion.com/sitemap.xml", | |
"www.sandiegouniontribune.com": "https://www.sandiegouniontribune.com/sitemap-latest.xml", | |
"www.latimes.com": "https://www.latimes.com/news-sitemap-latest.xml"} | |
# unfortunately global variables are proving to be better than Trio's | |
# preferred "memory channel" objects, which occasionally exhaust the | |
# operating system of available file descriptor objects during longer | |
# crawls. | |
# these hold references to urls that have been visited, hence can be ignored | |
seen = set() | |
seen_sitemaps = set() | |
# these hold sitemap and content page data to be added to the database | |
sitemaps = [] | |
entries = [] | |
pages = {} # I believe the purpose of this was to map URLs to data for deduplication | |
# running this code requires a working installation of postgres | |
db = dataset.connect('postgresql://kz:admin@localhost:5432/kz') | |
conn = psycopg2.connect(user="kz", password="admin", host="127.0.0.1", port="5432", database="kz") | |
table = db['crawldb'] | |
sitemap_db = db['sitemaps'] | |
insert_queue = {"crawldb": [], "sitemaps": []} | |
seen_sitemaps.update([row['url'] for row in sitemap_db]) | |
def create_crawldb_table(): | |
if 'crawldb' in db.tables: | |
return | |
create_table_query = '''CREATE TABLE crawldb | |
(id SERIAL PRIMARY KEY, | |
site TEXT, | |
url TEXT NOT NULL UNIQUE, | |
hostname TEXT, | |
path TEXT, | |
crawled boolean NOT NULL DEFAULT FALSE, | |
info json, | |
html text, | |
lastcrawled date, | |
status_code int, | |
ok boolean DEFAULT FALSE, | |
length int DEFAULT 0)''' | |
with conn.cursor() as cursor: | |
cursor.execute(create_table_query) | |
conn.commit() | |
def create_sitemaps_table(): | |
if 'sitemaps' in db.tables: | |
return | |
create_table_query = ("CREATE TABLE sitemaps\n" | |
" (id SERIAL PRIMARY KEY,\n" | |
" site TEXT,\n" | |
" url TEXT NOT NULL UNIQUE,\n" | |
" last_modified date DEFAULT CURRENT_DATE,\n" | |
" has_timestamp boolean,\n" | |
" needs_crawl boolean DEFAULT TRUE,\n" | |
" last_crawled date,\n" | |
" is_fresh boolean default TRUE,\n" | |
" fresh_content_urls int,\n" | |
" fresh_sitemap_urls int,\n" | |
" fresh_urls int,\n" | |
" is_sitemapindex boolean,\n" | |
" is_urlset boolean,\n" | |
" type text)") | |
with conn.cursor() as cursor: | |
cursor.execute(create_table_query) | |
conn.commit() | |
async def fetch_sitemaps(index_url): | |
site = ".".join(urlparse(index_url).hostname.split(".")[-2:]) | |
async with httpx.AsyncClient(timeout=60) as client: | |
res = await client.get(index_url, timeout=60, headers={ | |
"user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.130 Safari/537.36", | |
"dnt": "1", "cookie": "nyt-a=29482ninwfwe_efw;"}) | |
soup = BeautifulSoup(res.content, 'xml') | |
data = soup.find_all("sitemap") | |
urls = soup.find_all("url") | |
urlset = soup.find("urlset") | |
sitemapindex = soup.find("sitemapindex") | |
has_urls = bool(urlset) | |
has_sitemaps = bool(sitemapindex) | |
doctype = "sitemapindex" if has_sitemaps else "urlset" if has_urls else None | |
all_content_urls = set([node.text for node in urlset.find_all("loc") if site in node.text]) if urlset else set() | |
all_sitemap_urls = set([node.text for node in sitemapindex.find_all("loc") if sitemapindex]) if sitemapindex else set() | |
seen_content_urls = set([row['url'] for row in table.find(site=site)]) | |
seen_sitemap_urls = set([row['url'] for row in sitemap_db.find(site=site)]) | |
fresh_content_urls = all_content_urls.difference(seen_content_urls) | |
fresh_sitemap_urls = all_sitemap_urls.difference(seen_sitemap_urls) | |
n_fresh_content = len(list(fresh_content_urls)) | |
n_fresh_sitemaps = len(list(fresh_sitemap_urls)) | |
is_fresh = any([n_fresh_content, n_fresh_sitemaps]) | |
row = {"url": index_url, | |
"crawled": True, | |
"needs_crawl": False, | |
"last_crawled": datetime.date.today(), | |
"is_fresh": is_fresh, | |
"fresh_content_urls": n_fresh_content, | |
"fresh_sitemap_urls": n_fresh_sitemaps, | |
"fresh_urls": n_fresh_content + n_fresh_sitemaps, | |
"is_sitemapindex": has_sitemaps, | |
"is_urlset": has_urls, | |
"type": doctype} | |
if is_fresh and 'has_timestamp' in row and not row['has_timestamp']: | |
row['last_modified'] = datetime.date.today() | |
sitemap_db.upsert(row, ['url']) | |
if not is_fresh: | |
return | |
for sitemap in data: | |
sitemap_url = sitemap.find("loc").text | |
try: | |
last_modified = sitemap.find("lastmod").text | |
except: | |
last_modified = datetime.datetime.now().strftime("%x") | |
row = sitemap_db.find_one(url=sitemap_url) | |
exists = bool(row) | |
last_crawled = row['last_crawled'] if exists and row['last_crawled'] else datetime.datetime(1,1,1).date() | |
last_modified_dt = tsparse(last_modified).date() if last_modified else datetime.date.today() | |
has_timestamp = bool(last_modified) | |
try: | |
needs_crawl = last_modified_dt >= last_crawled | |
except Exception as e: | |
print(f"{e.__class__.__name__} :: {e} \n last_modified_dt = {last_modified_dt}\n last_crawled = {last_crawled}") | |
breakpoint() | |
if not exists: | |
row = {"url": sitemap_url, | |
"site": urlparse(sitemap_url).netloc, | |
"crawled": False, | |
"has_timestamp": has_timestamp, | |
"last_modified": last_modified_dt, | |
"needs_crawl": needs_crawl} | |
sitemap_db.insert(row) | |
print(json.dumps(row, indent=4, default=str)) | |
else: | |
d = datetime.datetime.now() | |
timezone = pytz.timezone("America/Los_Angeles") | |
d_aware = timezone.localize(d) | |
row = {"url": sitemap_url, "needs_crawl": needs_crawl} | |
sitemap_db.update(row, ["url"]) | |
for entry in urls: | |
url = entry.find("loc").text | |
entry = table.find_one(url=url) | |
if entry: | |
print(f"{url} already in database:\n {json.dumps(entry, indent=4, default=str)}") | |
continue | |
else: | |
print(f"Adding {url} to database") | |
parsed = urlparse(url) | |
site = '.'.join(parsed.netloc.split(".")[-2:]) | |
row = {"url": url, "site": site, "hostname": parsed.netloc, "path": parsed.path} | |
table.upsert(row, ['url']) | |
create_crawldb_table() | |
create_sitemaps_table() | |
async def crawl_sitemaps(site): | |
index_url = directory[site] | |
parsed = urlparse(index_url) | |
row = {"needs_crawl": True, "site": site, "url": index_url} | |
sitemap_db.upsert(row, ['url']) | |
n = 0 | |
while True: | |
queue = deque(sitemap_db.find(needs_crawl=True, site=site)) | |
print(f"{len(queue)} urls in the queue.") | |
for i, url in enumerate(queue): | |
print(f"{i} :: {url['url']}") | |
if any(queue): | |
while queue: | |
n_requests = min(len(queue), 10) | |
before = datetime.datetime.now() | |
async with trio.open_nursery() as nursery: | |
for i in range(n_requests): | |
row = queue.pop() | |
url = row['url'] | |
nursery.start_soon(fetch_sitemaps, url) | |
else: | |
print(f"Finished crawling sitemaps.") | |
break | |
if n == 0: | |
sitemap_db.upsert({"url": index_url, "needs_crawl": False}, ["url"]) | |
n += 1 | |
async def crawl_content_page(url): | |
keymap = {"Article": {"url": [('url',)], | |
"article": [('articleBody',)], | |
"description": [('description',)], | |
"image": [('image', 'url')], | |
"headline": [('headline',)], | |
"author": [('author', 'name')], | |
"category": [('articleSection',)], | |
"date": [('datePublished',)], | |
"keywords": [('keywords',)]}} | |
keymap['NewsArticle'] = keymap['Article'] | |
async with httpx.AsyncClient(timeout=30) as client: | |
res = await client.get(url, timeout=30, headers={ | |
"user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.130 Safari/537.36", | |
"dnt": "1", "cookie": "nyt-a=29482ninwfwe_efw;"}) | |
dom = parse_html(res.content) | |
info_payload = None | |
for payload in dom.xpath("//script[contains(@type,'application/ld+json')]/text()"): | |
payload = json.loads(payload) | |
try: | |
if '@type' in payload and payload['@type'] in keymap: | |
info_payload = payload | |
ptrs = keymap[payload['@type']] | |
obj = {} | |
for col, slx in ptrs.items(): | |
result = None | |
try: | |
for path in slx: | |
result = payload | |
try: | |
result = json.loads(result) | |
except: | |
pass | |
for selector in path: | |
if isinstance(result, list): | |
result = [x[selector] for x in result] | |
else: | |
result = result[selector] | |
obj[col] = result | |
continue | |
except KeyError: | |
continue | |
for k,v in obj.items(): | |
if k == 'date': | |
v = tsparse(v) | |
if isinstance(v, list): | |
v = ', '.join(v) | |
if isinstance(v, str): | |
v = unidecode.unidecode(unescape(v)) | |
obj[k] = v | |
obj['crawled'] = True | |
obj['length'] = len(res.content) | |
obj['info'] = info_payload | |
obj['status_code'] = int(res.status_code) | |
obj['ok'] = obj['status_code'] == 200 | |
obj['last_crawled'] = datetime.date.today() | |
print(obj['headline']) | |
table.update(obj, ['url']) | |
except TypeError as e: | |
print(f"{e.__class__.__name__} :: {e} :: {payload}") | |
async def crawl_urls(site): | |
queue = deque([page['url'] for page in table.find(hostname=site, crawled=False) if 'coronavirus' in page['url']]) | |
while queue: | |
n_requests = min(10, len(queue)) | |
async with trio.open_nursery() as nursery: | |
for i in range(n_requests): | |
nursery.start_soon(crawl_content_page, queue.pop()) | |
trio.run(crawl_sitemaps, site_to_crawl) | |
trio.run(crawl_urls, site_to_crawl) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment