Last active
August 29, 2015 14:24
-
-
Save askabelin/78e34beec4dfab0f1259 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf8 -*- | |
from datetime import datetime | |
from libs.common.Date import Date | |
import logging | |
import time | |
from app import redis | |
from app.redis_keys import SITE_PACK_PREFIX, URL_QUEUE, SITE_QUEUE_LEN_PREFIX, \ | |
SITE_LAST_DOWNLOAD_PREFIX, PACK_META_PREFIX | |
from app.site import Site, NoPacks | |
from app.pack import Pack | |
import config | |
from utils import unpack_site_pack_key | |
class Producer(object): | |
def __init__(self, do_loop=True): | |
self.sites = {} | |
self.do_loop = do_loop | |
def start(self): | |
#Проверяем пакеты на завершенность | |
self._try_finalize_packs() | |
# Загружаем список пачек | |
self._load_init_data() | |
last_load_init_data = datetime.now() | |
while self.do_loop or self.sites: | |
urls_produced = 0 | |
# Наполняем очередь для Consumer | |
for domain, site in self.sites.copy().iteritems(): | |
while redis.llen(URL_QUEUE) >= config.MAX_URLS_QUEUE_LEN: | |
logging.info('queue is full') | |
time.sleep(0.5) | |
# количество урлов по сайту в очереди | |
site_queue_len = int(redis.get(SITE_QUEUE_LEN_PREFIX + domain) or 0) | |
# сколько секунд назад было обращение к сайту | |
last_download_str = redis.get(SITE_LAST_DOWNLOAD_PREFIX + domain) | |
delta = None | |
if last_download_str: | |
last_download = Date.get_datetime_ms(last_download_str) | |
delta = (datetime.now() - last_download).total_seconds() | |
# минимальная ∆t между запросами к сайту | |
rate_limit = site.rate_limit | |
delta_limit = 1 | |
if rate_limit: | |
delta_limit = 60 / float(rate_limit) | |
# пропускаем, если по сайту | |
# уже есть url в очереди или прошло мало времени с прошлого запроса | |
if site_queue_len > 0 or (delta < delta_limit and delta is not None): | |
logging.debug(u'skipping {} (sql={}; ∆={}; ∆_limit={})'.format(domain, site_queue_len, delta, delta_limit)) | |
continue | |
try: | |
url = site.get_next_url() | |
pipe = redis.pipeline() | |
pipe.rpush(URL_QUEUE, (site.active_pack.id, domain, url)) | |
pipe.incr(SITE_QUEUE_LEN_PREFIX + domain) | |
pipe.execute() | |
logging.info('produced {} (sql: {}; delta: {})'.format(url, site_queue_len, delta)) | |
site.update_active_pack_meta() | |
urls_produced += 1 | |
except NoPacks: | |
logging.info('no packs: domain: {}'.format(domain)) | |
self.sites.pop(domain) | |
if len(self.sites) < config.CONSUMER_WORKERS_COUNT: | |
# Подгружаем старые пачки | |
last_init_delta = (datetime.now() - last_load_init_data).total_seconds() | |
if not self.sites or (self.sites and last_init_delta > 60): | |
if not self._load_init_data(): | |
last_load_init_data = datetime.now() | |
self._try_finalize_packs() | |
if len(self.sites) < config.CONSUMER_WORKERS_COUNT: | |
# Загружаем новые пачки | |
new_sites_cnt = self._get_new_sites() | |
if not self.sites: | |
logging.info('no sites: wait 5 sec...') | |
if self.do_loop: | |
time.sleep(5) | |
elif not new_sites_cnt and not urls_produced: | |
logging.info( | |
'all sites skipped: {}'.format(len(self.sites)) | |
) | |
if self.do_loop: | |
time.sleep(0.09) | |
else: | |
return | |
def _try_finalize_packs(self): | |
keys = redis.keys(PACK_META_PREFIX + '*') | |
pack_ids = map(lambda x: x[len(PACK_META_PREFIX):], keys) | |
logging.info('try finalize packs: {}'.format(len(pack_ids))) | |
for pack_id in pack_ids: | |
Pack(pack_id).try_finalize() | |
def _load_init_data(self): | |
return self._load_sites(redis.keys(SITE_PACK_PREFIX + '*')) | |
def _get_new_sites(self): | |
return self._load_sites(self._new_sites_gen(), True) | |
@staticmethod | |
def _new_sites_gen(): | |
while True: | |
sp = redis.lpop('new_site_packs') | |
if sp: | |
yield sp | |
else: | |
break | |
def _load_sites(self, site_pack_keys, load_all=False): | |
loaded_cnt = 0 | |
for site_pack_key in site_pack_keys: | |
domain, pack_id = unpack_site_pack_key(site_pack_key) | |
site = self.sites.get(domain, Site(domain)) | |
added = site.add(site_pack_key) | |
if not added: | |
continue | |
logging.info('site_pack to add: {}; sites_cnt: {}'.format( | |
site_pack_key, len(self.sites) | |
)) | |
self.sites.setdefault(domain, site) | |
if not load_all and len(self.sites) > 1000: | |
#Загружаем сайты порциями по 1000 | |
break | |
logging.info('add site_pack: {}'.format(site_pack_key)) | |
loaded_cnt += 1 | |
return loaded_cnt |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment