Skip to content

Instantly share code, notes, and snippets.

@excavador
Created September 6, 2017 19:11
Show Gist options
  • Save excavador/e896f9bfcd0a8ac2fe7ca048abd7ee0e to your computer and use it in GitHub Desktop.
Save excavador/e896f9bfcd0a8ac2fe7ca048abd7ee0e to your computer and use it in GitHub Desktop.
import os
import sys
import queue
import threading
import re
import requests
import logging
import colorlog
import hashlib
from requests import RequestException, Response
from requests.packages.urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
VALID_CONTENT_TYPES = ['application/javascript', 'text/javascript', 'application/x-javascript', 'text/html', 'text/css']
DOWNLOADER_WORKERS = 20
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logging.getLogger('requests').setLevel(logging.WARNING)
formatter = colorlog.ColoredFormatter(
(
'%(asctime)s '
'[%(cyan)s%(name)s%(reset)s] '
'[%(log_color)s%(levelname)s%(reset)s] '
'%(message_log_color)s%(message)s'
),
reset=True,
log_colors={
'DEBUG': 'bold_cyan',
'INFO': 'bold_green',
'WARNING': 'bold_yellow',
'ERROR': 'bold_red',
'CRITICAL': 'bold_red,bg_white',
},
secondary_log_colors={
'message': {
'DEBUG': 'white',
'INFO': 'bold_white',
'WARNING': 'bold_yellow',
'ERROR': 'bold_red',
'CRITICAL': 'bold_red',
},
},
style='%'
)
handler = logging.StreamHandler()
logger.addHandler(handler)
handler.setFormatter(formatter)
new_links_queue = queue.Queue()
responses_queue = queue.Queue()
finished_p_queue = queue.Queue()
new_links_p_queue = queue.Queue()
initial_link = sys.argv[1]
initial_domain = urlparse(initial_link).netloc
file_name_hash = hashlib.md5(initial_link.encode('utf-8')).hexdigest()
finished_p_file_path = os.path.join(sys.path[0], 'downloaded-{}.txt'.format(file_name_hash))
new_links_p_file_path = os.path.join(sys.path[0], 'new-{}.txt'.format(file_name_hash))
logger.info('Downloaded links storing into {}'.format(finished_p_file_path))
logger.info('New links storing into {}'.format(new_links_p_file_path))
try:
with open(finished_p_file_path, 'r') as myfile:
downloaded_links = myfile.readlines()
downloaded_links = [x.strip() for x in downloaded_links]
except FileNotFoundError:
downloaded_links = []
try:
with open(new_links_p_file_path, 'r') as myfile:
new_links = myfile.readlines()
new_links = [x.strip() for x in new_links]
except FileNotFoundError:
new_links = []
new_links.append(initial_link)
not_downloaded_links = list(set(new_links) - set(downloaded_links))
if not_downloaded_links:
logger.info('Resuming previous session: {}'.format(not_downloaded_links))
for link in not_downloaded_links:
new_links_queue.put(link)
def downloader():
logger.info('Downloader is alive!')
while True:
try:
url = new_links_queue.get()
if url in downloaded_links:
continue
downloaded_links.append(url)
response = requests.get(url, timeout=15)
result = {'url': url, 'response': response}
if response.status_code == 200:
responses_queue.put(result)
logger.debug(result)
else:
logger.debug('Fail: url={}, code={}'.format(url, response.status_code))
finished_p_queue.put((url, str(response.status_code)))
except requests.RequestException as e:
logger.warning(e)
finished_p_queue.put((url, str(e)))
except Exception as e:
logger.exception(e)
finally:
new_links_queue.task_done()
logger.critical('Downloader is dying!')
def parser():
logger.info('Parser is alive!')
while True:
try:
crawler_result = responses_queue.get()
response = crawler_result['response']
url = crawler_result['url']
domain = urlparse(url).netloc
# Before parsing the whole document I want to check it content-type
# I think it's a bit better way than trying to guess document type by it path (but it's a point to discuss)
link_content_types = [x.strip() for x in response.headers['content-type'].split(';')]
if not set(VALID_CONTENT_TYPES).intersection(link_content_types):
logger.info('Invalid content-type {} for {}'.format(link_content_types, url))
finished_p_queue.put((url, None))
continue
soup = BeautifulSoup(response.text, 'html.parser')
# I'm collecting links from only few tags and I'm really unsure about it
# Task description is extremly foggy so this code section is another point to discuss
for tag in soup.findAll(['a', 'link', 'script']):
if 'href' in tag.attrs:
link = urljoin(url, tag['href'])
elif 'src' in tag.attrs:
link = urljoin(url, tag['src'])
else:
continue
if link in downloaded_links:
continue
parsed_link = urlparse(link)
if parsed_link.netloc != initial_domain:
continue
logger.info('Parser parsed new link: {}'.format(link))
new_links_p_queue.put(link)
new_links_queue.put(link)
finished_p_queue.put((url, response.content))
# There are a lot of possible exceptions but parser worker needs to be alive in any case
# That why I have chose this super-wide catch section
except Exception as e:
logger.exception(e)
finally:
responses_queue.task_done()
logger.critical('Parser is dying!')
def persister(file_path, target_queue):
logger.info('Persister is alive! Writing into {}'.format(file_path))
with open(file_path, 'a+') as myfile:
while True:
try:
result = target_queue.get()
if isinstance(result, tuple):
url, content = result
else:
url = result
content = None
myfile.write(url + '\n')
myfile.flush()
if content:
path = url.replace('/', '_').replace(':', '_')
with open(path, 'w') as f:
f.write(str(content))
except Exception as e:
logger.exception(e)
finally:
target_queue.task_done()
logger.critical('Persister is dying! {}'.format(file_path))
threads = []
# Parser is CPU-bound so I don't need multiple instances of it
# In common case I can run it native way (just call parser()) (but it will block next lines execution)
threads.append(threading.Thread(target=parser))
# Downloader is IO-bound so I can get performance boost if would download pages in async way
for x in range(0, DOWNLOADER_WORKERS):
threads.append(threading.Thread(target=downloader))
# Persister is IO-bound too but I can't get boost of multiple instances because of writing to only one file
threads.append(threading.Thread(target=persister, args=(finished_p_file_path, finished_p_queue,)))
threads.append(threading.Thread(target=persister, args=(new_links_p_file_path, new_links_p_queue,)))
for t in threads:
t.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment