Skip to content

Instantly share code, notes, and snippets.

@askabelin
Last active August 29, 2015 14:24
Show Gist options
  • Save askabelin/78e34beec4dfab0f1259 to your computer and use it in GitHub Desktop.
Save askabelin/78e34beec4dfab0f1259 to your computer and use it in GitHub Desktop.
# -*- coding: utf8 -*-
from datetime import datetime
from libs.common.Date import Date
import logging
import time
from app import redis
from app.redis_keys import SITE_PACK_PREFIX, URL_QUEUE, SITE_QUEUE_LEN_PREFIX, \
SITE_LAST_DOWNLOAD_PREFIX, PACK_META_PREFIX
from app.site import Site, NoPacks
from app.pack import Pack
import config
from utils import unpack_site_pack_key
class Producer(object):
def __init__(self, do_loop=True):
self.sites = {}
self.do_loop = do_loop
def start(self):
#Проверяем пакеты на завершенность
self._try_finalize_packs()
# Загружаем список пачек
self._load_init_data()
last_load_init_data = datetime.now()
while self.do_loop or self.sites:
urls_produced = 0
# Наполняем очередь для Consumer
for domain, site in self.sites.copy().iteritems():
while redis.llen(URL_QUEUE) >= config.MAX_URLS_QUEUE_LEN:
logging.info('queue is full')
time.sleep(0.5)
# количество урлов по сайту в очереди
site_queue_len = int(redis.get(SITE_QUEUE_LEN_PREFIX + domain) or 0)
# сколько секунд назад было обращение к сайту
last_download_str = redis.get(SITE_LAST_DOWNLOAD_PREFIX + domain)
delta = None
if last_download_str:
last_download = Date.get_datetime_ms(last_download_str)
delta = (datetime.now() - last_download).total_seconds()
# минимальная ∆t между запросами к сайту
rate_limit = site.rate_limit
delta_limit = 1
if rate_limit:
delta_limit = 60 / float(rate_limit)
# пропускаем, если по сайту
# уже есть url в очереди или прошло мало времени с прошлого запроса
if site_queue_len > 0 or (delta < delta_limit and delta is not None):
logging.debug(u'skipping {} (sql={}; ∆={}; ∆_limit={})'.format(domain, site_queue_len, delta, delta_limit))
continue
try:
url = site.get_next_url()
pipe = redis.pipeline()
pipe.rpush(URL_QUEUE, (site.active_pack.id, domain, url))
pipe.incr(SITE_QUEUE_LEN_PREFIX + domain)
pipe.execute()
logging.info('produced {} (sql: {}; delta: {})'.format(url, site_queue_len, delta))
site.update_active_pack_meta()
urls_produced += 1
except NoPacks:
logging.info('no packs: domain: {}'.format(domain))
self.sites.pop(domain)
if len(self.sites) < config.CONSUMER_WORKERS_COUNT:
# Подгружаем старые пачки
last_init_delta = (datetime.now() - last_load_init_data).total_seconds()
if not self.sites or (self.sites and last_init_delta > 60):
if not self._load_init_data():
last_load_init_data = datetime.now()
self._try_finalize_packs()
if len(self.sites) < config.CONSUMER_WORKERS_COUNT:
# Загружаем новые пачки
new_sites_cnt = self._get_new_sites()
if not self.sites:
logging.info('no sites: wait 5 sec...')
if self.do_loop:
time.sleep(5)
elif not new_sites_cnt and not urls_produced:
logging.info(
'all sites skipped: {}'.format(len(self.sites))
)
if self.do_loop:
time.sleep(0.09)
else:
return
def _try_finalize_packs(self):
keys = redis.keys(PACK_META_PREFIX + '*')
pack_ids = map(lambda x: x[len(PACK_META_PREFIX):], keys)
logging.info('try finalize packs: {}'.format(len(pack_ids)))
for pack_id in pack_ids:
Pack(pack_id).try_finalize()
def _load_init_data(self):
return self._load_sites(redis.keys(SITE_PACK_PREFIX + '*'))
def _get_new_sites(self):
return self._load_sites(self._new_sites_gen(), True)
@staticmethod
def _new_sites_gen():
while True:
sp = redis.lpop('new_site_packs')
if sp:
yield sp
else:
break
def _load_sites(self, site_pack_keys, load_all=False):
loaded_cnt = 0
for site_pack_key in site_pack_keys:
domain, pack_id = unpack_site_pack_key(site_pack_key)
site = self.sites.get(domain, Site(domain))
added = site.add(site_pack_key)
if not added:
continue
logging.info('site_pack to add: {}; sites_cnt: {}'.format(
site_pack_key, len(self.sites)
))
self.sites.setdefault(domain, site)
if not load_all and len(self.sites) > 1000:
#Загружаем сайты порциями по 1000
break
logging.info('add site_pack: {}'.format(site_pack_key))
loaded_cnt += 1
return loaded_cnt
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment