Last active
August 29, 2015 14:24
-
-
Save askabelin/e113fc7d459ffdfd93b3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf8 -*- | |
from datetime import datetime | |
from libs.common.Date import Date | |
import logging | |
from app import redis | |
from app.pack import Pack | |
from app.page import Page | |
from app.tasks.parse import parse_content_wrapper | |
import config | |
from utils import unpack_site_pack_key | |
class NoPacks(Exception): | |
pass | |
class Site(object): | |
def __init__(self, domain): | |
self.domain = domain | |
self.packs = [] | |
self.active_pack = None | |
self.packs_meta_data = {} # {int: {priority': int}} | |
self.packs_counters = {} # {int: int} | |
def __str__(self): | |
return str(self.domain) | |
@property | |
def site_pack_key(self): | |
return self._get_site_pack_key(self.active_pack.id) | |
@property | |
def pack(self): | |
return Pack(self.active_pack.id) | |
@property | |
def pack_meta(self): | |
return self.packs_meta_data.get(self.active_pack.id, {}) if self.active_pack else {} | |
@property | |
def rate_limit(self): | |
if self.active_pack is not None: | |
return self.active_pack.get_rate_limit() | |
return config.DEFAULT_RATE_LIMIT | |
def get_next_url(self): | |
u"""Получаем следующий урл""" | |
url = None | |
while not url: | |
if not self.packs: | |
raise NoPacks() | |
if self.active_pack is None: | |
self.active_pack = self.packs[0] | |
logging.debug( | |
'[{}] set active pack id={}'.format(self.domain, self.active_pack.id) | |
) | |
if self._pack_falls_in_collect_interval() and not self.active_pack.removed: | |
url = self._get_not_cached_url() | |
if not url or self._need_switch_active_pack_by_priority() or self.active_pack.removed: | |
old_pack = self.active_pack | |
if len(self.packs) > 1: | |
self.active_pack = self._get_next_pack() | |
logging.debug('[{}] set next active pack id={}'.format(self.domain, self.active_pack.id)) | |
# сбрасываем счетчик приоритета | |
self.packs_counters[old_pack.id] -= self.packs_meta_data[old_pack.id]['priority'] | |
if url is None or old_pack.removed: | |
self._delete_pack(old_pack) | |
old_pack.try_finalize() | |
if self.active_pack.id == old_pack.id: | |
# пакет не переключился, но мы его удаляем, значит сбрасываем его в None | |
self.active_pack = None | |
logging.debug( | |
'[{}] next url: {}; site_pack_key: {}'.format( | |
self.domain, url, self.site_pack_key | |
) | |
) | |
return url | |
def _delete_pack(self, pack): | |
if pack in self.packs: | |
self.packs.remove(pack) | |
self.packs_meta_data.pop(pack.id, None) | |
self.packs_counters.pop(pack.id, None) | |
redis.delete(self._get_site_pack_key(pack.id)) | |
def _need_switch_active_pack_by_priority(self): | |
# TODO если у всех паков одинаковый приоритет и в текущем меньше всего задач, то не переключать | |
if self._is_same_priority(): | |
return False | |
return self.packs_counters[self.active_pack.id] >= self.packs_meta_data[self.active_pack.id]['priority'] | |
def _pack_falls_in_collect_interval(self): | |
fall_in = True | |
collect_intervals = self.active_pack.get_collect_intervals() | |
if collect_intervals: | |
fall_in = Date.is_time_in_intervals( | |
datetime.now(), collect_intervals | |
) | |
if not fall_in: | |
self.active_pack.set_delay_on() | |
return fall_in | |
def _get_next_pack(self): | |
next_pack_idx = (self.packs.index(self.active_pack) + 1) % len(self.packs) | |
return self.packs[next_pack_idx] | |
def add(self, site_pack_key): | |
""" | |
site_pack_key = 'sp_{{domain}}_{{pack_id}}' | |
Словарь с мета данными пакета | |
pack = { | |
'priority': int, | |
... | |
} | |
""" | |
domain, pack_id = unpack_site_pack_key(site_pack_key) | |
if pack_id in [p.id for p in self.packs]: | |
logging.info('adding site_pack_key {} skipped: site already has this pack'.format(site_pack_key)) | |
return | |
# добавляем пакет сайту | |
pack = Pack(pack_id) | |
# обновляем мета данные пакетов | |
for p in self.packs + [pack]: | |
self.packs_meta_data[p.id] = p.get_redis_meta_data() | |
# устанавливаем счетчик приоритета | |
self.packs_counters.setdefault(pack.id, 0) | |
self.packs.append(pack) | |
# если это самый приоритетный пакет, то делаем активным его | |
try: | |
max_priority = self._get_max_packs_priority() | |
new_pack_priority = self.packs_meta_data[pack.id]['priority'] | |
new_pack_hash_max_priority = new_pack_priority == max_priority | |
active_pack_priority = 0 | |
if self.active_pack: | |
active_pack_priority = self.packs_meta_data[self.active_pack.id]['priority'] | |
if new_pack_hash_max_priority and new_pack_priority > active_pack_priority: | |
logging.info('pack {} has highest priority'.format(pack_id)) | |
self.active_pack = pack | |
pack.set_state_inwork() | |
return True | |
except KeyError as e: | |
logging.warning('No priority in pack: {}'.format(pack.id)) | |
pack.set_state_error(str(e)) | |
self._delete_pack(pack) | |
def update_active_pack_meta(self): | |
# Обновляем счетчики | |
self.packs_counters[self.active_pack.id] += self._get_min_packs_priority() | |
def _get_min_packs_priority(self): | |
lowest_priority_meta = min( | |
self.packs_meta_data.values(), key=lambda x: x['priority'] | |
) | |
return int(lowest_priority_meta['priority']) | |
def _get_max_packs_priority(self): | |
lowest_priority_meta = max( | |
self.packs_meta_data.values(), key=lambda x: x['priority'] | |
) | |
return int(lowest_priority_meta['priority']) | |
def _is_same_priority(self): | |
try: | |
priorities = set([v['priority'] for v in self.packs_meta_data.values()]) | |
except KeyError as e: | |
logging.warning(e) | |
for k, v in self.packs_meta_data.items(): | |
if 'priority' not in v: | |
logging.warning('No priority in pack: {}; {}'.format(k, str(v))) | |
pack = Pack(k) | |
self._delete_pack(pack) | |
pack.remove() | |
raise e | |
return len(priorities) == 1 | |
def _get_site_pack_key(self, pack_id): | |
return 'sp_{}_{}'.format(self.domain, pack_id) | |
def _get_not_cached_url(self): | |
while True: | |
url = redis.lpop(self.site_pack_key) | |
if url is None: | |
return url | |
elif not Page(url).content_cached(self.pack_meta['cache_days']): | |
redis.hincrby(self.active_pack.key, 'urls_queued', 1) | |
return url | |
else: | |
redis.hincrby(self.active_pack.key, 'urls_from_cache', 1) | |
if self.pack_meta['patterns'] or self.pack_meta['occurences'] or self.pack_meta.get('css_selectors', ()): | |
parse_content_wrapper(url, self.pack_meta) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment