Skip to content

Instantly share code, notes, and snippets.

@askabelin
Created June 30, 2015 21:14
Show Gist options
  • Save askabelin/9688176a16e40bc3207d to your computer and use it in GitHub Desktop.
Save askabelin/9688176a16e40bc3207d to your computer and use it in GitHub Desktop.
# -*- coding: utf8 -*-
from datetime import datetime, timedelta
import json
from libs.common.Date import Date
from libs.common.os_tools import create_file, remove_file, create_gzip, \
read_gzip
from libs.common.string import md5, encode_from_page_charset, hash_string
import logging
import os
from kombu.utils import cached_property
import simplejson
import config
class Page(object):
def __init__(self, url, id=None):
self.url = url
self.url_hash = md5(url)
self.id = id
def __str__(self):
return u'{} ({})'.format(self.url, self.url_hash)
@property
def folder_path(self):
# формируем путь к файлу static/pages/{hash[:2]}/{hash[2:4]}
return os.path.join(
config.PAGES_PATH, self.url_hash[:2], self.url_hash[2:4]
)
@property
def content_path(self):
return os.path.join(self.folder_path, self.url_hash + '.html.gz')
@property
def meta_path(self):
return os.path.join(self.folder_path, self.url_hash + '.meta.json.gz')
def get_copy_path(self, pack_id):
return os.path.join(
self.folder_path, 'copies', self.url_hash + '.copy.{}.html'.format(pack_id)
)
def save_copy(self, pack_id, content):
create_file(self.get_copy_path(pack_id), content, force=True)
def get_copy_url(self, pack_id):
rel_path = os.path.relpath(self.get_copy_path(pack_id), config.PROJECT_PATH)
return '{}{}'.format(config.BASE_URL, rel_path)
def get_content_url(self):
rel_path = os.path.relpath(self.content_path, config.PROJECT_PATH)
return '{}{}'.format(config.BASE_URL, rel_path)
def get_content(self):
content = read_gzip(self.content_path)
return encode_from_page_charset(content)
def save_content(self, content):
create_gzip(self.content_path, content, force=True)
def save_metadata(self, data):
create_gzip(self.meta_path, json.dumps(data), force=True)
def clear_page_data(self):
if os.path.exists(self.folder_path):
filenames = filter(lambda x: x.startswith(self.url_hash), os.listdir(self.folder_path))
for f in filenames:
remove_file(f)
@cached_property
def content_date(self):
if os.path.exists(self.content_path):
return datetime.fromtimestamp(os.path.getmtime(self.content_path))
@cached_property
def meta_date(self):
if os.path.exists(self.meta_path):
return datetime.fromtimestamp(os.path.getmtime(self.meta_path))
def content_cached(self, days_delta=0):
if not days_delta or not self.content_date:
return False
return datetime.now() - self.content_date <= timedelta(days_delta)
def is_failed(self, days_delta=0, minutes_delta=0):
if not (days_delta or minutes_delta):
return False
if not os.path.exists(self.meta_path):
return False
meta = self.get_meta_data()
if 'error_datetime' in meta:
error_datetime = Date.get_datetime_ms(meta['error_datetime'])
return datetime.now() - error_datetime <= timedelta(days=days_delta, minutes=minutes_delta)
def _get_parse_params(self, pack_meta, key):
params = pack_meta.get('parse_params_per_url', {}).get(str(hash_string(self.url)), {})
return pack_meta.get(key, ()) or params.get(key, ())
def get_result(self, pack, include_url=False):
parse_result = self.get_parse_result(self._get_parse_params(pack.data, 'patterns'))
occurences_result = self.get_occurences_result(self._get_parse_params(pack.data, 'occurences'))
css_selectors_result = self.get_css_selectors_result(self._get_parse_params(pack.data, 'css_selectors'))
meta_data = self.get_meta_data()
if not pack.data.get('headers'):
meta_data.pop('headers', None)
meta_data['timestamp'] = self.meta_date.isoformat() if self.meta_date else None
task_data = {'id': self.id}
if include_url:
task_data['url'] = self.url
page_result = {
'task': task_data,
'parse_result': parse_result,
'occurences_result': occurences_result,
'css_selectors_result': css_selectors_result,
'meta': meta_data,
}
if pack.data.get('save_page_copy', 0):
page_result['copy_url'] = self.get_copy_url(pack.id)
if pack.data.get('content_url', 0):
page_result['content_url'] = self.get_content_url()
if not (page_result['meta'].get('status_code') or page_result['meta'].get('error')):
page_result['meta']['error'] = 'no data'
return page_result
def save_parse_result(self, pattern, result):
filepath = self.get_pattern_result_path(pattern)
create_gzip(filepath, simplejson.dumps(result), force=True)
def get_parse_result(self, patterns):
parse_result = []
for pt in patterns:
filepath = self.get_pattern_result_path(pt)
pattern_result = []
if os.path.exists(filepath):
content = read_gzip(filepath)
pattern_result = json.loads(content)
parse_result.append(pattern_result)
return parse_result
def save_occurence_result(self, occurence):
filepath = self.get_occurence_result_path(occurence)
create_gzip(filepath, 'found', force=True)
def get_occurences_result(self, occurences):
result = []
for o in occurences:
filepath = self.get_occurence_result_path(o)
result.append(int(os.path.exists(filepath)))
return result
def save_css_selector_result(self, css_selector, result):
filepath = self.get_css_selector_result_path(css_selector)
create_gzip(filepath, simplejson.dumps(result), force=True)
def get_css_selectors_result(self, css_selectors):
parse_result = []
for s in css_selectors:
filepath = self.get_css_selector_result_path(s)
css_selector_result = []
if os.path.exists(filepath):
content = read_gzip(filepath)
css_selector_result = json.loads(content)
parse_result.append(css_selector_result)
return parse_result
def get_pattern_result_path(self, pattern):
return os.path.join(
self.folder_path,
self.url_hash + '_' + md5(pattern) + '.parse.json.gz'
)
def get_occurence_result_path(self, occurence):
return os.path.join(
self.folder_path,
self.url_hash + '_' + md5(occurence) + '.occurence.json.gz'
)
def get_css_selector_result_path(self, css_selector):
return os.path.join(
self.folder_path,
self.url_hash + '_' + md5(str(css_selector)) + '.css_selector.json.gz'
)
def get_meta_data(self):
if os.path.exists(self.meta_path):
try:
content = read_gzip(self.meta_path)
return json.loads(content)
except ValueError as e:
logging.warning(e)
remove_file(self.meta_path)
return {}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment