Skip to content

Instantly share code, notes, and snippets.

@askabelin
Created June 30, 2015 21:30
Show Gist options
  • Save askabelin/4b4a648bb95d302e5d97 to your computer and use it in GitHub Desktop.
Save askabelin/4b4a648bb95d302e5d97 to your computer and use it in GitHub Desktop.
# -*- coding: utf8 -*-
from gevent import monkey
from app.pack import Pack
monkey.patch_socket()
monkey.patch_ssl()
monkey.patch_thread()
from app import redis
from app.page import Page
from app.redis_keys import URL_QUEUE, SITE_QUEUE_LEN_PREFIX, SITE_LAST_DOWNLOAD_PREFIX
from app.downloader import Downloader
from ast import literal_eval
from datetime import datetime
from requests.exceptions import RequestException
from tasks.parse import parse_content
import config
import gevent
import requests
import logging
from libs.common.Profiler import Chronometer
class Consumer(object):
def start(self):
# TODO: поддержка кол-ва воркеров
logging.info('starting')
self._reset_counters()
self._update_sql_counters()
workers = []
for i in xrange(config.CONSUMER_WORKERS_COUNT):
workers.append(gevent.spawn(self.consume, i))
gevent.joinall(workers)
@staticmethod
def _reset_counters():
del_list = ['']
del_list.extend(redis.keys(SITE_QUEUE_LEN_PREFIX + '*'))
del_list.extend(redis.keys(SITE_LAST_DOWNLOAD_PREFIX + '*'))
redis.delete(*del_list)
@staticmethod
def _update_sql_counters():
"""
увеличивает счетчик в очереди по сайту
"""
pipe = redis.pipeline()
for url_data in redis.lrange(URL_QUEUE, 0, -1):
sql_key = SITE_QUEUE_LEN_PREFIX + literal_eval(url_data)[1]
pipe.incr(sql_key)
pipe.expire(sql_key, config.PAGE_DOWNLOAD_TIMEOUT*2)
pipe.execute()
def consume(self, consumer_id=None):
while True:
try:
url_data = redis.lpop(URL_QUEUE)
if url_data is None:
logging.debug('no url_data sleep: {} {} sec'.format(consumer_id, 0.2))
gevent.sleep(0.2)
continue
Chronometer().checkpoint('consume{}'.format(consumer_id))
logging.info('{} start: {}'.format(consumer_id, url_data))
status = self._consume_url(url_data, consumer_id)
logging.info('{} {}: {}; in {} sec'.format(consumer_id, status, url_data, Chronometer().checkpoint('consume{}'.format(consumer_id))))
except Exception:
logging.exception('error in {}: '.format(consumer_id))
def _consume_url(self, url_data, consumer_id=None):
status = 'problem'
pack_id, domain, url = literal_eval(url_data)
pack = Pack(pack_id)
page = Page(url)
try:
page.clear_page_data()
content = self._dump_url_data(page, pack, consumer_id)
if content:
status = 'ok'
patterns = pack.get_patterns(page.url)
occurences = pack.get_occurences(page.url)
css_selectors = pack.get_css_selectors(page.url)
exclude_patterns = pack.get_exclude_patterns(page.url)
if patterns or occurences or css_selectors:
logging.debug('creating task to parse: {}'.format(page.url))
parse_content.delay(
page.url,
patterns,
occurences,
css_selectors,
exclude_patterns
)
return status
finally:
pipe = redis.pipeline()
pipe.decr(SITE_QUEUE_LEN_PREFIX + domain)
pipe.set(SITE_LAST_DOWNLOAD_PREFIX + domain, datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'))
pipe.expire(SITE_LAST_DOWNLOAD_PREFIX + domain, 3600 * 24)
pipe.hincrby(pack.key, 'urls_' + status, 1)
pipe.execute()
def _dump_url_data(self, page, pack, consumer_id=None):
Chronometer().checkpoint('download{}'.format(consumer_id))
downloader = Downloader(timeout=config.PAGE_DOWNLOAD_TIMEOUT)
try:
return self._try_dump(downloader, page, pack)
except (gevent.Timeout, requests.exceptions.Timeout, RequestException, UnicodeDecodeError) as e:
logging.warning(
'{}({}): {}; proxies: {}; in {} sec'.format(
e.__class__.__name__, str(e), page.url, downloader.used_proxies,
Chronometer().checkpoint('download{}'.format(consumer_id))
)
)
page.save_metadata({'error': '{}: {}'.format(e.__class__.__name__, str(e))})
except Exception as e:
logging.exception(e)
page.save_metadata({'error': '{}: {}'.format(e.__class__.__name__, str(e))})
def _try_dump(self, downloader, page, pack):
with gevent.Timeout(config.PAGE_DOWNLOAD_TIMEOUT, gevent.Timeout):
return downloader.download(page, pack)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment