Skip to content

Instantly share code, notes, and snippets.

@daTokenizer
Created March 4, 2020 06:09
Show Gist options
  • Save daTokenizer/24b97afd0dc5ca4b1f22d6db1d447b08 to your computer and use it in GitHub Desktop.
Save daTokenizer/24b97afd0dc5ca4b1f22d6db1d447b08 to your computer and use it in GitHub Desktop.
a primitive web crawler for demo purposes
#! /usr/bin/python
import argparse
import csv
import requests
import re
from bs4 import BeautifulSoup
import pickle
supported_protocols = [
'http',
'https'
]
# Basically every print line in this file should be replaced with "log"
# function to be reported per user requested verbosity
############## CACHE #################
# in a real project i would use Redis rather than writing to disk myself
# the file structure is just stright forward flat, but if i were to build it
# as a real system (and had to use files) i whould have used some
# heirarchical dir structure that reflects the uri, meaning a.com/b/c.html
# whould have been stored under <workdir>/a.com/b/c/<cache>
def sanitize_key(raw_key):
return re.sub('[\\\/!@#$\?\&\=\:]', '_', raw_key)
def key_to_filename(raw_key):
# all caching will be done in /tmp for my conviniance
key = sanitize_key(raw_key)
return "/tmp/%s" % key
def from_cache(key):
filename = key_to_filename(key)
try:
with open(filename, 'r') as in_file:
return in_file.read()
except:
# cache miss
pass
return None
def to_cache(key, value):
filename = key_to_filename(key)
with open(filename, 'w') as out_file:
# note that in full html caching you need to handle unicode as well
out_file.write(value)
############ URL PARSING UTILS ############
def strip_to_domain(url):
try:
uri = url.split("://")[1]
domain = uri.split("/")[0]
return domain
except: # IndexError, AttributeError.. we dont really care
return None
def is_same_domain(url1, url2):
domain1 = strip_to_domain(url1)
domain2 = strip_to_domain(url2)
return domain1 == domain2
def is_valid_protocol(url):
if not url:
return False
url_parts = url.split("://")
protocol = url_parts[0]
protocol_supported = (protocol in supported_protocols)
if not protocol_supported:
try:
print("Unsupported or missing protocol: '{}'. Current supported"
" protocols: {}".format(protocol, supported_protocols))
except:
# no logs no problem :)
pass
return False
too_many_protocols = (len(url_parts) != 2)
if too_many_protocols:
print("Malformed URL: too many protocols! "
"expected 1 '://' found %s" % str(len(url_parts)-1))
return False
# .. more validations to be added here
return True
############### HEAVY LIFTING #####################
# Caching is done on the extracted links level, and not the whole page level.
# This help achieve two goals: 1. far less work is needed when changing the
# _report_ function. 2. no need to save huge bulky non-ascii files to disk,
# just the already processed result. The price is less flexabuility in the
# analysis stage, so I added the locations were caching is needed to store
# entire pages as well.
def fetch_url(url):
cached_data = None # from_cache(url) <- to also cache entire pages
if not cached_data:
try:
r = requests.get(url)
if (r.status_code == 200):
mime_type = (r.headers['content-type']).split(';')[0]
if (mime_type == 'text/html'):
# to_cache(url, r.text) <- to also cache entire pages
return r.text
except requests.ConnectionError as e:
print("Could not connect to URL:%s, skipping" % url)
return cached_data
def extract_links(url, webpage_text):
soup = BeautifulSoup(webpage_text, features='lxml')
all_links = [l.get('href') for l in soup.find_all('a')]
links = filter(is_valid_protocol, all_links)
return links
def crawl(curr_url, depth, reporter_func):
if not is_valid_protocol(curr_url):
return
cached_links = from_cache(curr_url+"--extracted_links")
if not cached_links:
webpage_text = fetch_url(curr_url)
if not webpage_text:
return
links = extract_links(curr_url, webpage_text)
pickled_links = pickle.dumps(links)
to_cache(curr_url+"--extracted_links", pickled_links)
else:
print("Cache hit for %s" % curr_url)
links = pickle.loads(cached_links)
yield reporter_func(curr_url, depth, links)
if depth > 1: # this is weird, but that's the spec
for url in links:
for report in crawl(url ,depth-1, reporter_func):
yield report
#################### REPORTING #####################
def calc_link_ratio(curr_url, links):
if not links:
return "no links"
same_domain_links = filter(lambda l: is_same_domain(l, curr_url), links)
return float(len(same_domain_links))/len(links)
def gen_report_line(depth_limit, curr_url, depth, links):
return [
curr_url,
depth_limit - depth + 1, #invert depth report, to conform with spec
calc_link_ratio(curr_url, links),
# .. more report lines
]
################## MAIN ####################
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='crawler ex for lightricks')
parser.add_argument(dest='root_url',
help='The URL to use as root')
parser.add_argument(dest='depth_limit',
help='How deep should links be followed')
parser.add_argument(dest='out_dir', nargs='?', default="/tmp",
help='A path to which the result file will be written')
args = parser.parse_args()
print("root url:<%s>" % args.root_url)
depth_limit = int(args.depth_limit)
print("depth limit:<%s>" % depth_limit)
out_file_path = '%s/result.tsv' % args.out_dir
print("out file: <%s>" % out_file_path)
with open(out_file_path, 'wt') as out_file:
tsv_writer = csv.writer(out_file, delimiter='\t')
tsv_writer.writerow(['url', 'depth', 'ratio'])
seen_urls = set()
for report_line in crawl(
args.root_url,
depth_limit,
lambda c,d,l: gen_report_line(depth_limit, c, d, l)
):
if report_line and (report_line[0] not in seen_urls):
tsv_writer.writerow(report_line)
seen_urls.add(report_line[0]) # remove dup lines
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment