Skip to content

Instantly share code, notes, and snippets.

@rlskoeser
Forked from meg-codes/cdh-scrape.py
Last active September 12, 2018 18:07
Show Gist options
  • Save rlskoeser/777694bfebf2fea031946316007cf5f0 to your computer and use it in GitHub Desktop.
Save rlskoeser/777694bfebf2fea031946316007cf5f0 to your computer and use it in GitHub Desktop.
A basic web-scrape script designed to get summary information an all resources on a site; includes a report of bad links
#!/usr/bin/env python
# Script to scrape all links from a site, compile counts of each link, status
# codes of access and output the results as a CSV
#
# There's absolutely no reason this shouldn't be pulled into an OOP paradigm
# per se, but I left it functionalized because that can be easier for multitasking.
#
# Requirements:
# requests, bs4
# python3
#
# Usage:
# python scrape.py [base_url]
#
# base_url - top level domain, with or without http(s) string.
# This will attempt to crawl all pages, so if a sub portion of the site links
# to /, any linked pages will be crawled anyway.
import argparse
from collections import OrderedDict
import csv
from datetime import datetime
import sys
import urllib.parse
from bs4 import BeautifulSoup, SoupStrainer
import requests
from requests.exceptions import ConnectionError
def recurse_scrape(base_url, url, session, results, verbose=False, source_url=None):
"""Scrape and recurse all urls found a website
Args:
base_url (str) - base url to begin scrape recursion from
url (str) - url for the current iteration to scrape for further links
session (obj) - common requests Session object
results (list) - common list for all results
"""
# break on mailto
if url.startswith('mailto'):
return
# bail out after a set number for testing the script
# if len(results.keys()) > 10:
# return
# scan results list and if this is a duplicate, don't request, just note
# that there is a link to the same resource
if url in results.keys():
results[url]['links'] += 1
# NOTE: skipping anchor difference here;
# add logic to strip out when storing if we care
return
# prevent recursion to another site
if urllib.parse.urlsplit(base_url)[1] not in url:
return
# get the result, including a redirect but don't follow it, we want to
# log it separately and run it through the recursive scrape
try:
if verbose:
sys.stdout.write('Scraping: %s\n' % url)
response = session.get(url, allow_redirects=False)
except ConnectionError:
# handle connection errors and log as such just in case no status code is ever returned.
results[url] = {
'url': url,
'date': datetime.utcnow(), # TODO: Format this the same way as the requests date field
'status code': 'CONN-ERR',
'content type': '',
'last modified': '',
'size': '',
'links': 1,
'timestamp': datetime.utcnow().isoformat()
}
return
results[url] = {
'url': url,
'date': response.headers.get('Date', ''),
'status code': response.status_code,
'content type': response.headers.get('Content-Type', ''),
'last modified': response.headers.get('Last-Modified', ''),
'contentlength': response.headers.get('Content-Length', ''),
'size': len(response.content),
'links': 1,
# only documents the first place we found it
'source url': source_url or '',
# timestamp in isoformat so we can filter on it more easily
'timestamp': datetime.utcnow().isoformat()
}
# allow recursion to follow redirects even on off site links
if response.status_code in [302, 301]:
recurse_scrape(
base_url,
urllib.parse.urljoin(base_url, response.headers['Location']),
session,
results,
verbose=verbose,
# current url becomes the source url on recursion
source_url=url
)
# any other codes 300-500s should be treated as dead-ends as they indicate
# an error, otherwise, if status is OK, grab the page, parse any and all
# links, and recurse on them
if response.status_code == requests.codes.ok\
and response.headers['Content-Type'].startswith('text/html'):
# only_a_tags = SoupStrainer("a")
page = BeautifulSoup(response.content, features='html.parser')
links = []
# find all links and included content: header link, image, script, a
for link in page.find_all('link'):
links.append(link['href'])
for img in page.find_all('img'):
links.append(img['src'])
for link in page.find_all('a'):
if link.has_attr('href'):
links.append(link['href'])
for script in page.find_all('script'):
if script.has_attr('src'):
links.append(script['src'])
for link_url in links:
# ignore anchor links
if '#' in link_url:
link_url = link_url.split('#')[0]
# links starting with http need no modification
if not link_url.startswith('http'):
# if link is relative
if not link_url.startswith('/'):
# make relative to current url
link_url = urllib.parse.urljoin(url, link_url)
else:
# make relative to base url
link_url = urllib.parse.urljoin(base_url, link_url)
recurse_scrape(
base_url,
link_url,
session,
results,
verbose=verbose,
source_url=url
)
def create_parser():
"""Define an argparse instance"""
parser = argparse.ArgumentParser(
description='Utility script for scraping and analyzing a CDH website'
)
parser.add_argument('base_url', action='store', type=str,
help='The base url to begin scraping from')
parser.add_argument('--output', '-o', action='store', type=str,
default='output.csv', help='name of output CSV file')
parser.add_argument('--verbose', '-v', action='store_true')
return parser
def main():
# create parser and get args
parser = create_parser()
args = parser.parse_args()
# build a common session for scraper and set session headers
session = requests.Session()
session.headers = {
'User-Agent': 'cdh-scraper/0.1'
}
results = OrderedDict()
base_url = args.base_url
if not base_url.startswith('http'):
# regrettably this ends up being neater than using urllib
# in terms of code readablility
base_url = 'http://%s' % base_url.strip('/')
# detect a site-wide upgrade to https:// or redirect
response = session.get(base_url, allow_redirects=False)
if response.status_code == 301:
sys.stdout.write('Detected an upgrade to https...\n')
base_url = response.headers['Location']
# begin the recursion
recurse_scrape(base_url, base_url, session, results, verbose=args.verbose)
with open(args.output, 'w') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=results[base_url].keys())
writer.writeheader()
for row in results.values():
writer.writerow(row)
sys.stdout.write('Problematic urls:\n')
sys.stdout.write('-----------------\n')
for result in results.values():
if result['status code'] not in [301, 302, 200, 'CONN-ERR']:
sys.stdout.write('%(url)s\t%(status code)s\t%(source url)s\n' % result)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment