Created
June 30, 2015 21:30
-
-
Save askabelin/4b4a648bb95d302e5d97 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf8 -*- | |
from gevent import monkey | |
from app.pack import Pack | |
monkey.patch_socket() | |
monkey.patch_ssl() | |
monkey.patch_thread() | |
from app import redis | |
from app.page import Page | |
from app.redis_keys import URL_QUEUE, SITE_QUEUE_LEN_PREFIX, SITE_LAST_DOWNLOAD_PREFIX | |
from app.downloader import Downloader | |
from ast import literal_eval | |
from datetime import datetime | |
from requests.exceptions import RequestException | |
from tasks.parse import parse_content | |
import config | |
import gevent | |
import requests | |
import logging | |
from libs.common.Profiler import Chronometer | |
class Consumer(object): | |
def start(self): | |
# TODO: поддержка кол-ва воркеров | |
logging.info('starting') | |
self._reset_counters() | |
self._update_sql_counters() | |
workers = [] | |
for i in xrange(config.CONSUMER_WORKERS_COUNT): | |
workers.append(gevent.spawn(self.consume, i)) | |
gevent.joinall(workers) | |
@staticmethod | |
def _reset_counters(): | |
del_list = [''] | |
del_list.extend(redis.keys(SITE_QUEUE_LEN_PREFIX + '*')) | |
del_list.extend(redis.keys(SITE_LAST_DOWNLOAD_PREFIX + '*')) | |
redis.delete(*del_list) | |
@staticmethod | |
def _update_sql_counters(): | |
""" | |
увеличивает счетчик в очереди по сайту | |
""" | |
pipe = redis.pipeline() | |
for url_data in redis.lrange(URL_QUEUE, 0, -1): | |
sql_key = SITE_QUEUE_LEN_PREFIX + literal_eval(url_data)[1] | |
pipe.incr(sql_key) | |
pipe.expire(sql_key, config.PAGE_DOWNLOAD_TIMEOUT*2) | |
pipe.execute() | |
def consume(self, consumer_id=None): | |
while True: | |
try: | |
url_data = redis.lpop(URL_QUEUE) | |
if url_data is None: | |
logging.debug('no url_data sleep: {} {} sec'.format(consumer_id, 0.2)) | |
gevent.sleep(0.2) | |
continue | |
Chronometer().checkpoint('consume{}'.format(consumer_id)) | |
logging.info('{} start: {}'.format(consumer_id, url_data)) | |
status = self._consume_url(url_data, consumer_id) | |
logging.info('{} {}: {}; in {} sec'.format(consumer_id, status, url_data, Chronometer().checkpoint('consume{}'.format(consumer_id)))) | |
except Exception: | |
logging.exception('error in {}: '.format(consumer_id)) | |
def _consume_url(self, url_data, consumer_id=None): | |
status = 'problem' | |
pack_id, domain, url = literal_eval(url_data) | |
pack = Pack(pack_id) | |
page = Page(url) | |
try: | |
page.clear_page_data() | |
content = self._dump_url_data(page, pack, consumer_id) | |
if content: | |
status = 'ok' | |
patterns = pack.get_patterns(page.url) | |
occurences = pack.get_occurences(page.url) | |
css_selectors = pack.get_css_selectors(page.url) | |
exclude_patterns = pack.get_exclude_patterns(page.url) | |
if patterns or occurences or css_selectors: | |
logging.debug('creating task to parse: {}'.format(page.url)) | |
parse_content.delay( | |
page.url, | |
patterns, | |
occurences, | |
css_selectors, | |
exclude_patterns | |
) | |
return status | |
finally: | |
pipe = redis.pipeline() | |
pipe.decr(SITE_QUEUE_LEN_PREFIX + domain) | |
pipe.set(SITE_LAST_DOWNLOAD_PREFIX + domain, datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')) | |
pipe.expire(SITE_LAST_DOWNLOAD_PREFIX + domain, 3600 * 24) | |
pipe.hincrby(pack.key, 'urls_' + status, 1) | |
pipe.execute() | |
def _dump_url_data(self, page, pack, consumer_id=None): | |
Chronometer().checkpoint('download{}'.format(consumer_id)) | |
downloader = Downloader(timeout=config.PAGE_DOWNLOAD_TIMEOUT) | |
try: | |
return self._try_dump(downloader, page, pack) | |
except (gevent.Timeout, requests.exceptions.Timeout, RequestException, UnicodeDecodeError) as e: | |
logging.warning( | |
'{}({}): {}; proxies: {}; in {} sec'.format( | |
e.__class__.__name__, str(e), page.url, downloader.used_proxies, | |
Chronometer().checkpoint('download{}'.format(consumer_id)) | |
) | |
) | |
page.save_metadata({'error': '{}: {}'.format(e.__class__.__name__, str(e))}) | |
except Exception as e: | |
logging.exception(e) | |
page.save_metadata({'error': '{}: {}'.format(e.__class__.__name__, str(e))}) | |
def _try_dump(self, downloader, page, pack): | |
with gevent.Timeout(config.PAGE_DOWNLOAD_TIMEOUT, gevent.Timeout): | |
return downloader.download(page, pack) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment