Created
September 6, 2017 19:11
-
-
Save excavador/e896f9bfcd0a8ac2fe7ca048abd7ee0e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sys | |
import queue | |
import threading | |
import re | |
import requests | |
import logging | |
import colorlog | |
import hashlib | |
from requests import RequestException, Response | |
from requests.packages.urllib3.util.retry import Retry | |
from requests.adapters import HTTPAdapter | |
from bs4 import BeautifulSoup | |
from urllib.parse import urljoin, urlparse | |
VALID_CONTENT_TYPES = ['application/javascript', 'text/javascript', 'application/x-javascript', 'text/html', 'text/css'] | |
DOWNLOADER_WORKERS = 20 | |
logger = logging.getLogger(__name__) | |
logger.setLevel(logging.DEBUG) | |
logging.getLogger('requests').setLevel(logging.WARNING) | |
formatter = colorlog.ColoredFormatter( | |
( | |
'%(asctime)s ' | |
'[%(cyan)s%(name)s%(reset)s] ' | |
'[%(log_color)s%(levelname)s%(reset)s] ' | |
'%(message_log_color)s%(message)s' | |
), | |
reset=True, | |
log_colors={ | |
'DEBUG': 'bold_cyan', | |
'INFO': 'bold_green', | |
'WARNING': 'bold_yellow', | |
'ERROR': 'bold_red', | |
'CRITICAL': 'bold_red,bg_white', | |
}, | |
secondary_log_colors={ | |
'message': { | |
'DEBUG': 'white', | |
'INFO': 'bold_white', | |
'WARNING': 'bold_yellow', | |
'ERROR': 'bold_red', | |
'CRITICAL': 'bold_red', | |
}, | |
}, | |
style='%' | |
) | |
handler = logging.StreamHandler() | |
logger.addHandler(handler) | |
handler.setFormatter(formatter) | |
new_links_queue = queue.Queue() | |
responses_queue = queue.Queue() | |
finished_p_queue = queue.Queue() | |
new_links_p_queue = queue.Queue() | |
initial_link = sys.argv[1] | |
initial_domain = urlparse(initial_link).netloc | |
file_name_hash = hashlib.md5(initial_link.encode('utf-8')).hexdigest() | |
finished_p_file_path = os.path.join(sys.path[0], 'downloaded-{}.txt'.format(file_name_hash)) | |
new_links_p_file_path = os.path.join(sys.path[0], 'new-{}.txt'.format(file_name_hash)) | |
logger.info('Downloaded links storing into {}'.format(finished_p_file_path)) | |
logger.info('New links storing into {}'.format(new_links_p_file_path)) | |
try: | |
with open(finished_p_file_path, 'r') as myfile: | |
downloaded_links = myfile.readlines() | |
downloaded_links = [x.strip() for x in downloaded_links] | |
except FileNotFoundError: | |
downloaded_links = [] | |
try: | |
with open(new_links_p_file_path, 'r') as myfile: | |
new_links = myfile.readlines() | |
new_links = [x.strip() for x in new_links] | |
except FileNotFoundError: | |
new_links = [] | |
new_links.append(initial_link) | |
not_downloaded_links = list(set(new_links) - set(downloaded_links)) | |
if not_downloaded_links: | |
logger.info('Resuming previous session: {}'.format(not_downloaded_links)) | |
for link in not_downloaded_links: | |
new_links_queue.put(link) | |
def downloader(): | |
logger.info('Downloader is alive!') | |
while True: | |
try: | |
url = new_links_queue.get() | |
if url in downloaded_links: | |
continue | |
downloaded_links.append(url) | |
response = requests.get(url, timeout=15) | |
result = {'url': url, 'response': response} | |
if response.status_code == 200: | |
responses_queue.put(result) | |
logger.debug(result) | |
else: | |
logger.debug('Fail: url={}, code={}'.format(url, response.status_code)) | |
finished_p_queue.put((url, str(response.status_code))) | |
except requests.RequestException as e: | |
logger.warning(e) | |
finished_p_queue.put((url, str(e))) | |
except Exception as e: | |
logger.exception(e) | |
finally: | |
new_links_queue.task_done() | |
logger.critical('Downloader is dying!') | |
def parser(): | |
logger.info('Parser is alive!') | |
while True: | |
try: | |
crawler_result = responses_queue.get() | |
response = crawler_result['response'] | |
url = crawler_result['url'] | |
domain = urlparse(url).netloc | |
# Before parsing the whole document I want to check it content-type | |
# I think it's a bit better way than trying to guess document type by it path (but it's a point to discuss) | |
link_content_types = [x.strip() for x in response.headers['content-type'].split(';')] | |
if not set(VALID_CONTENT_TYPES).intersection(link_content_types): | |
logger.info('Invalid content-type {} for {}'.format(link_content_types, url)) | |
finished_p_queue.put((url, None)) | |
continue | |
soup = BeautifulSoup(response.text, 'html.parser') | |
# I'm collecting links from only few tags and I'm really unsure about it | |
# Task description is extremly foggy so this code section is another point to discuss | |
for tag in soup.findAll(['a', 'link', 'script']): | |
if 'href' in tag.attrs: | |
link = urljoin(url, tag['href']) | |
elif 'src' in tag.attrs: | |
link = urljoin(url, tag['src']) | |
else: | |
continue | |
if link in downloaded_links: | |
continue | |
parsed_link = urlparse(link) | |
if parsed_link.netloc != initial_domain: | |
continue | |
logger.info('Parser parsed new link: {}'.format(link)) | |
new_links_p_queue.put(link) | |
new_links_queue.put(link) | |
finished_p_queue.put((url, response.content)) | |
# There are a lot of possible exceptions but parser worker needs to be alive in any case | |
# That why I have chose this super-wide catch section | |
except Exception as e: | |
logger.exception(e) | |
finally: | |
responses_queue.task_done() | |
logger.critical('Parser is dying!') | |
def persister(file_path, target_queue): | |
logger.info('Persister is alive! Writing into {}'.format(file_path)) | |
with open(file_path, 'a+') as myfile: | |
while True: | |
try: | |
result = target_queue.get() | |
if isinstance(result, tuple): | |
url, content = result | |
else: | |
url = result | |
content = None | |
myfile.write(url + '\n') | |
myfile.flush() | |
if content: | |
path = url.replace('/', '_').replace(':', '_') | |
with open(path, 'w') as f: | |
f.write(str(content)) | |
except Exception as e: | |
logger.exception(e) | |
finally: | |
target_queue.task_done() | |
logger.critical('Persister is dying! {}'.format(file_path)) | |
threads = [] | |
# Parser is CPU-bound so I don't need multiple instances of it | |
# In common case I can run it native way (just call parser()) (but it will block next lines execution) | |
threads.append(threading.Thread(target=parser)) | |
# Downloader is IO-bound so I can get performance boost if would download pages in async way | |
for x in range(0, DOWNLOADER_WORKERS): | |
threads.append(threading.Thread(target=downloader)) | |
# Persister is IO-bound too but I can't get boost of multiple instances because of writing to only one file | |
threads.append(threading.Thread(target=persister, args=(finished_p_file_path, finished_p_queue,))) | |
threads.append(threading.Thread(target=persister, args=(new_links_p_file_path, new_links_p_queue,))) | |
for t in threads: | |
t.start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment