Skip to content

Instantly share code, notes, and snippets.

@askabelin
Last active August 29, 2015 14:24
Show Gist options
  • Save askabelin/e113fc7d459ffdfd93b3 to your computer and use it in GitHub Desktop.
Save askabelin/e113fc7d459ffdfd93b3 to your computer and use it in GitHub Desktop.
# -*- coding: utf8 -*-
from datetime import datetime
from libs.common.Date import Date
import logging
from app import redis
from app.pack import Pack
from app.page import Page
from app.tasks.parse import parse_content_wrapper
import config
from utils import unpack_site_pack_key
class NoPacks(Exception):
pass
class Site(object):
def __init__(self, domain):
self.domain = domain
self.packs = []
self.active_pack = None
self.packs_meta_data = {} # {int: {priority': int}}
self.packs_counters = {} # {int: int}
def __str__(self):
return str(self.domain)
@property
def site_pack_key(self):
return self._get_site_pack_key(self.active_pack.id)
@property
def pack(self):
return Pack(self.active_pack.id)
@property
def pack_meta(self):
return self.packs_meta_data.get(self.active_pack.id, {}) if self.active_pack else {}
@property
def rate_limit(self):
if self.active_pack is not None:
return self.active_pack.get_rate_limit()
return config.DEFAULT_RATE_LIMIT
def get_next_url(self):
u"""Получаем следующий урл"""
url = None
while not url:
if not self.packs:
raise NoPacks()
if self.active_pack is None:
self.active_pack = self.packs[0]
logging.debug(
'[{}] set active pack id={}'.format(self.domain, self.active_pack.id)
)
if self._pack_falls_in_collect_interval() and not self.active_pack.removed:
url = self._get_not_cached_url()
if not url or self._need_switch_active_pack_by_priority() or self.active_pack.removed:
old_pack = self.active_pack
if len(self.packs) > 1:
self.active_pack = self._get_next_pack()
logging.debug('[{}] set next active pack id={}'.format(self.domain, self.active_pack.id))
# сбрасываем счетчик приоритета
self.packs_counters[old_pack.id] -= self.packs_meta_data[old_pack.id]['priority']
if url is None or old_pack.removed:
self._delete_pack(old_pack)
old_pack.try_finalize()
if self.active_pack.id == old_pack.id:
# пакет не переключился, но мы его удаляем, значит сбрасываем его в None
self.active_pack = None
logging.debug(
'[{}] next url: {}; site_pack_key: {}'.format(
self.domain, url, self.site_pack_key
)
)
return url
def _delete_pack(self, pack):
if pack in self.packs:
self.packs.remove(pack)
self.packs_meta_data.pop(pack.id, None)
self.packs_counters.pop(pack.id, None)
redis.delete(self._get_site_pack_key(pack.id))
def _need_switch_active_pack_by_priority(self):
# TODO если у всех паков одинаковый приоритет и в текущем меньше всего задач, то не переключать
if self._is_same_priority():
return False
return self.packs_counters[self.active_pack.id] >= self.packs_meta_data[self.active_pack.id]['priority']
def _pack_falls_in_collect_interval(self):
fall_in = True
collect_intervals = self.active_pack.get_collect_intervals()
if collect_intervals:
fall_in = Date.is_time_in_intervals(
datetime.now(), collect_intervals
)
if not fall_in:
self.active_pack.set_delay_on()
return fall_in
def _get_next_pack(self):
next_pack_idx = (self.packs.index(self.active_pack) + 1) % len(self.packs)
return self.packs[next_pack_idx]
def add(self, site_pack_key):
"""
site_pack_key = 'sp_{{domain}}_{{pack_id}}'
Словарь с мета данными пакета
pack = {
'priority': int,
...
}
"""
domain, pack_id = unpack_site_pack_key(site_pack_key)
if pack_id in [p.id for p in self.packs]:
logging.info('adding site_pack_key {} skipped: site already has this pack'.format(site_pack_key))
return
# добавляем пакет сайту
pack = Pack(pack_id)
# обновляем мета данные пакетов
for p in self.packs + [pack]:
self.packs_meta_data[p.id] = p.get_redis_meta_data()
# устанавливаем счетчик приоритета
self.packs_counters.setdefault(pack.id, 0)
self.packs.append(pack)
# если это самый приоритетный пакет, то делаем активным его
try:
max_priority = self._get_max_packs_priority()
new_pack_priority = self.packs_meta_data[pack.id]['priority']
new_pack_hash_max_priority = new_pack_priority == max_priority
active_pack_priority = 0
if self.active_pack:
active_pack_priority = self.packs_meta_data[self.active_pack.id]['priority']
if new_pack_hash_max_priority and new_pack_priority > active_pack_priority:
logging.info('pack {} has highest priority'.format(pack_id))
self.active_pack = pack
pack.set_state_inwork()
return True
except KeyError as e:
logging.warning('No priority in pack: {}'.format(pack.id))
pack.set_state_error(str(e))
self._delete_pack(pack)
def update_active_pack_meta(self):
# Обновляем счетчики
self.packs_counters[self.active_pack.id] += self._get_min_packs_priority()
def _get_min_packs_priority(self):
lowest_priority_meta = min(
self.packs_meta_data.values(), key=lambda x: x['priority']
)
return int(lowest_priority_meta['priority'])
def _get_max_packs_priority(self):
lowest_priority_meta = max(
self.packs_meta_data.values(), key=lambda x: x['priority']
)
return int(lowest_priority_meta['priority'])
def _is_same_priority(self):
try:
priorities = set([v['priority'] for v in self.packs_meta_data.values()])
except KeyError as e:
logging.warning(e)
for k, v in self.packs_meta_data.items():
if 'priority' not in v:
logging.warning('No priority in pack: {}; {}'.format(k, str(v)))
pack = Pack(k)
self._delete_pack(pack)
pack.remove()
raise e
return len(priorities) == 1
def _get_site_pack_key(self, pack_id):
return 'sp_{}_{}'.format(self.domain, pack_id)
def _get_not_cached_url(self):
while True:
url = redis.lpop(self.site_pack_key)
if url is None:
return url
elif not Page(url).content_cached(self.pack_meta['cache_days']):
redis.hincrby(self.active_pack.key, 'urls_queued', 1)
return url
else:
redis.hincrby(self.active_pack.key, 'urls_from_cache', 1)
if self.pack_meta['patterns'] or self.pack_meta['occurences'] or self.pack_meta.get('css_selectors', ()):
parse_content_wrapper(url, self.pack_meta)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment