Created
March 4, 2020 06:09
-
-
Save daTokenizer/24b97afd0dc5ca4b1f22d6db1d447b08 to your computer and use it in GitHub Desktop.
a primitive web crawler for demo purposes
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/python | |
import argparse | |
import csv | |
import requests | |
import re | |
from bs4 import BeautifulSoup | |
import pickle | |
supported_protocols = [ | |
'http', | |
'https' | |
] | |
# Basically every print line in this file should be replaced with "log" | |
# function to be reported per user requested verbosity | |
############## CACHE ################# | |
# in a real project i would use Redis rather than writing to disk myself | |
# the file structure is just stright forward flat, but if i were to build it | |
# as a real system (and had to use files) i whould have used some | |
# heirarchical dir structure that reflects the uri, meaning a.com/b/c.html | |
# whould have been stored under <workdir>/a.com/b/c/<cache> | |
def sanitize_key(raw_key): | |
return re.sub('[\\\/!@#$\?\&\=\:]', '_', raw_key) | |
def key_to_filename(raw_key): | |
# all caching will be done in /tmp for my conviniance | |
key = sanitize_key(raw_key) | |
return "/tmp/%s" % key | |
def from_cache(key): | |
filename = key_to_filename(key) | |
try: | |
with open(filename, 'r') as in_file: | |
return in_file.read() | |
except: | |
# cache miss | |
pass | |
return None | |
def to_cache(key, value): | |
filename = key_to_filename(key) | |
with open(filename, 'w') as out_file: | |
# note that in full html caching you need to handle unicode as well | |
out_file.write(value) | |
############ URL PARSING UTILS ############ | |
def strip_to_domain(url): | |
try: | |
uri = url.split("://")[1] | |
domain = uri.split("/")[0] | |
return domain | |
except: # IndexError, AttributeError.. we dont really care | |
return None | |
def is_same_domain(url1, url2): | |
domain1 = strip_to_domain(url1) | |
domain2 = strip_to_domain(url2) | |
return domain1 == domain2 | |
def is_valid_protocol(url): | |
if not url: | |
return False | |
url_parts = url.split("://") | |
protocol = url_parts[0] | |
protocol_supported = (protocol in supported_protocols) | |
if not protocol_supported: | |
try: | |
print("Unsupported or missing protocol: '{}'. Current supported" | |
" protocols: {}".format(protocol, supported_protocols)) | |
except: | |
# no logs no problem :) | |
pass | |
return False | |
too_many_protocols = (len(url_parts) != 2) | |
if too_many_protocols: | |
print("Malformed URL: too many protocols! " | |
"expected 1 '://' found %s" % str(len(url_parts)-1)) | |
return False | |
# .. more validations to be added here | |
return True | |
############### HEAVY LIFTING ##################### | |
# Caching is done on the extracted links level, and not the whole page level. | |
# This help achieve two goals: 1. far less work is needed when changing the | |
# _report_ function. 2. no need to save huge bulky non-ascii files to disk, | |
# just the already processed result. The price is less flexabuility in the | |
# analysis stage, so I added the locations were caching is needed to store | |
# entire pages as well. | |
def fetch_url(url): | |
cached_data = None # from_cache(url) <- to also cache entire pages | |
if not cached_data: | |
try: | |
r = requests.get(url) | |
if (r.status_code == 200): | |
mime_type = (r.headers['content-type']).split(';')[0] | |
if (mime_type == 'text/html'): | |
# to_cache(url, r.text) <- to also cache entire pages | |
return r.text | |
except requests.ConnectionError as e: | |
print("Could not connect to URL:%s, skipping" % url) | |
return cached_data | |
def extract_links(url, webpage_text): | |
soup = BeautifulSoup(webpage_text, features='lxml') | |
all_links = [l.get('href') for l in soup.find_all('a')] | |
links = filter(is_valid_protocol, all_links) | |
return links | |
def crawl(curr_url, depth, reporter_func): | |
if not is_valid_protocol(curr_url): | |
return | |
cached_links = from_cache(curr_url+"--extracted_links") | |
if not cached_links: | |
webpage_text = fetch_url(curr_url) | |
if not webpage_text: | |
return | |
links = extract_links(curr_url, webpage_text) | |
pickled_links = pickle.dumps(links) | |
to_cache(curr_url+"--extracted_links", pickled_links) | |
else: | |
print("Cache hit for %s" % curr_url) | |
links = pickle.loads(cached_links) | |
yield reporter_func(curr_url, depth, links) | |
if depth > 1: # this is weird, but that's the spec | |
for url in links: | |
for report in crawl(url ,depth-1, reporter_func): | |
yield report | |
#################### REPORTING ##################### | |
def calc_link_ratio(curr_url, links): | |
if not links: | |
return "no links" | |
same_domain_links = filter(lambda l: is_same_domain(l, curr_url), links) | |
return float(len(same_domain_links))/len(links) | |
def gen_report_line(depth_limit, curr_url, depth, links): | |
return [ | |
curr_url, | |
depth_limit - depth + 1, #invert depth report, to conform with spec | |
calc_link_ratio(curr_url, links), | |
# .. more report lines | |
] | |
################## MAIN #################### | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description='crawler ex for lightricks') | |
parser.add_argument(dest='root_url', | |
help='The URL to use as root') | |
parser.add_argument(dest='depth_limit', | |
help='How deep should links be followed') | |
parser.add_argument(dest='out_dir', nargs='?', default="/tmp", | |
help='A path to which the result file will be written') | |
args = parser.parse_args() | |
print("root url:<%s>" % args.root_url) | |
depth_limit = int(args.depth_limit) | |
print("depth limit:<%s>" % depth_limit) | |
out_file_path = '%s/result.tsv' % args.out_dir | |
print("out file: <%s>" % out_file_path) | |
with open(out_file_path, 'wt') as out_file: | |
tsv_writer = csv.writer(out_file, delimiter='\t') | |
tsv_writer.writerow(['url', 'depth', 'ratio']) | |
seen_urls = set() | |
for report_line in crawl( | |
args.root_url, | |
depth_limit, | |
lambda c,d,l: gen_report_line(depth_limit, c, d, l) | |
): | |
if report_line and (report_line[0] not in seen_urls): | |
tsv_writer.writerow(report_line) | |
seen_urls.add(report_line[0]) # remove dup lines |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment