Created
October 24, 2014 02:05
-
-
Save Lupino/90b51a07f3069ae498ee to your computer and use it in GitHub Desktop.
页面中图片提取算法
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from urllib.parse import urljoin, urlparse, parse_qs | |
import re | |
import os | |
from bs4 import BeautifulSoup | |
import hashlib | |
__all__ = ['SureImage'] | |
def md5sum(parent): | |
''' | |
md5 the parent elem | |
''' | |
h = hashlib.md5() | |
for p in parent: | |
h.update(bytes(p, 'utf-8')) | |
return h.hexdigest() | |
def parse_title(orig_title): | |
''' | |
parse the title remove the seo text | |
''' | |
if re.search('[>»]|»', orig_title): | |
title = re.sub('[>»]|»', '|', orig_title).split('|') | |
title = title[-1].strip() | |
elif re.search('[_-]|\|', orig_title): | |
title = re.sub('[_-]', '|', orig_title).split('|') | |
title = title[0].strip() | |
else: | |
title = re.sub('[-_<>«»~&;!!]', '|', orig_title) | |
title = re.sub('\|+', '|', title) | |
title = title.split('|') | |
title = title[0] | |
return title | |
class Image(object): | |
__slots__ = ['attrs', 'score', 'src', 'title', 'orig_title', 'link', | |
'parent', 'elem_count', 'parent_length'] | |
def __init__(self, orig_title, title, link): | |
self.attrs = {} | |
self.score = 0 | |
self.src = '' | |
self.title = title | |
self.orig_title = orig_title | |
self.link = link | |
self.parent = [] | |
def add_info(self): | |
self.elem_count = len(self.parent) | |
self.parent_length = len('>'.join(self.parent)) | |
def __str__(self): | |
return self.src | |
class Link(object): | |
__slots__ = ['href', 'score', 'rel', 'parent', 'text', 'count', | |
'elem_count', 'parent_length'] | |
def __init__(self): | |
self.href = '' | |
self.score = 0 | |
self.rel = '' | |
self.parent = [] | |
self.text = '' | |
self.count = 1 | |
def add_info(self): | |
self.elem_count = len(self.parent) | |
self.parent_length = len('>'.join(self.parent)) | |
def __str__(self): | |
return self.href | |
class SureImage(object): | |
_re_positive = re.compile( | |
'/imgs/|static|blank|\?|&|%|slt|medium|' | |
'lib|mobile|small', re.I) | |
_re_reject = re.compile( | |
'logo|about|avatar|thum|skin|css|js|face|suolue|login|small|data:|' | |
'previews|next|themes?|plugins?|ad|qrcode|scripts?|' | |
'load(?:ing|_\d{0,3})|[\{\}]|icons?|styles?|templ(ate|et)s?|tps|' | |
'assets|[ts]_?\d+(jpe?g|png|gif)$|\?|ui', re.I) | |
_re_negative = re.compile( | |
'wp-content|/\d{2,6}/|big|larger?|articles?|uploads?|post|origin', | |
re.I) | |
_re_pagelink = re.compile( | |
'p[ag]{0,2}(?:e|ing|ination)?[=/]\d{1,3}|_\d{1,3}\.html?', re.I) | |
_re_nextlink = re.compile( | |
'[>»]|continue|next|weiter(?:[^\|]|$)|下一(页|张)?', re.I) | |
_re_prevlink = re.compile( | |
'[<«]|earl|new|old|prev|上一(页|张)?', re.I) | |
_re_pagenum = re.compile('^\d{1,3}$') | |
_re_search_eng = re.compile( | |
'google.com|baidu.com|soso.com|yahoo.com|sogou.com|youdao.com' | |
'|so.com|qihoo.com', re.I) | |
__slots__ = ['_soup', 'imgs', 'links', '_base_url', '_base_name', | |
'_base_path', '_base_host', '_query', '_re_text', | |
'_orig_title', '_title'] | |
def __init__(self, base_url, soup): | |
if not isinstance(soup, BeautifulSoup): | |
soup = BeautifulSoup(soup) | |
self._soup = soup | |
self.imgs = [] | |
self.links = [] | |
self._base_url = base_url | |
p = urlparse(self._base_url) | |
self._base_name = os.path.basename(p.path) | |
self._base_path = os.path.dirname(p.path) | |
self._base_host = p[1] | |
self._query = parse_qs(p.query) | |
self.fetch_title() | |
def fetch_title(self): | |
elem = self._soup.find('title') | |
self._orig_title = '' | |
if elem: | |
self._orig_title = elem.get_text().strip() | |
re_text = re.sub( | |
'[-_,,<>«» ~)(【】\[\]\(\)&;\*\?\s\+]', '|', self._orig_title) | |
re_text = re.sub('\|+', '|', re_text) | |
try: | |
self._re_text = re.compile(re_text) | |
except: | |
self._re_text = None | |
self._title = parse_title(self._orig_title) | |
elem = self._soup.find('meta', {"name": "viewport"}) | |
if elem: | |
tags = ['h1', 'h2', 'h3'] | |
is_find = False | |
for tag in tags: | |
elems = self._soup.find_all(tag) | |
if elems: | |
self._title = elems[0].get_text() | |
is_find = True | |
break | |
if not is_find: | |
elem = self._soup.find(True, re.compile('title')) | |
if elem: | |
self._title = elem.get_text() | |
def fetch_imgs(self): | |
for elem in self._soup.find_all('img'): | |
a = elem.find_parent('a') | |
if a and a.get('rel') in ['unfollow', 'nofollow']: | |
continue | |
img = Image(self._orig_title, self._title, self._base_url) | |
for name, value in elem.attrs.items(): | |
self.parse_img_attrs(img, name, value) | |
img.parent = self.parse_parent(elem) | |
img.add_info() | |
parent = ' '.join(img.parent) | |
# print(parent) | |
if re.search('side|ul|li|commit', parent, re.I): | |
img.score -= 100 | |
elif re.search('content|main|p|container', parent, re.I): | |
img.score += 100 | |
exists = False | |
maybe = [] | |
for _, val in img.attrs.items(): | |
m = re.search('^([^#]+)#', val) | |
if m: | |
val = m.group(1) | |
if re.match('''^https?://[^"']+$''', val, re.I): | |
maybe.append(val) | |
if len(maybe) >= 1: | |
img.src = maybe[0] | |
if a: | |
href = urljoin(self._base_url, a.get('href')) | |
if self._re_search_eng.search(href): | |
continue | |
if re.match('^https?://[^/]+/[^\?]+(jpe?g|png|gif)$', href): | |
if img.src == href: | |
img.score += 500 | |
else: | |
img.src = href | |
img.score += 100 | |
captions = [] | |
for name, value in a.attrs.items(): | |
if name.startswith("data-") or \ | |
name in ["title", "alt", "caption"]: | |
if value and not re.search("点击|浏览|https?://|^/", | |
value): | |
captions.append((name, value)) | |
img.title = ','.join([v[1] for v in captions] + [img.title]) | |
uri = img.src | |
u = urlparse(uri) | |
if len(u.path) < 20: | |
img.score -= 50 | |
filename = os.path.basename(u.path) | |
if len(filename) < 10: | |
if re.match('^\d+\.(jpe?g|png|gif)$', filename, re.I): | |
img.score += 20 | |
elif re.match( | |
'^[ts]_?\d{0,3}\.(jpe?g|png|gif)$', filename, re.I): | |
img.score -= 300 | |
else: | |
img.score -= 30 | |
if self._re_positive.search(uri): | |
img.score -= 25 | |
if self._re_negative.search(uri): | |
img.score += 15 | |
img.score += len(img.src) * 1.4 | |
img.score -= len(img.parent) * 0.2 | |
img.score = int(img.score) | |
for _img in self.imgs: | |
if _img.src == img.src: | |
_img.score -= 300 | |
exists = True | |
break | |
if not exists: | |
if self._re_reject.search(img.src) and \ | |
not self._re_negative.search(img.src): | |
img.score -= 300 | |
self.imgs.append(img) | |
def fetch_script_imgs(self): | |
retval = [] | |
for script in self._soup.find_all('script'): | |
text = script.get_text() | |
links = re.findall('https?://[^/]+/[^"\']+', text, re.I) | |
for link in links: | |
if re.search('^https?://[^/]+/.+(jpe?g|png|gif)$', link, re.I): | |
img = Image(self._orig_title, self._title, self._base_url) | |
img.src = link | |
img.parent = self.parse_parent(script) | |
img.add_info() | |
img.parent.append('script') | |
img.score = 50 | |
if self._re_positive.search(img.src): | |
img.score -= 25 | |
if self._re_negative.search(img.src): | |
img.score += 15 | |
uri = img.src | |
u = urlparse(uri) | |
if len(u.path) < 20: | |
img.score -= 50 | |
filename = os.path.basename(u.path) | |
if len(filename) < 10: | |
if re.match('^\d+\.(jpe?g|png|gif)$', filename, re.I): | |
img.score += 20 | |
elif re.match('^[ts]_?\d{0,3}\.(jpe?g|png|gif)$', | |
filename, re.I): | |
img.score -= 300 | |
else: | |
img.score -= 30 | |
img.score += len(img.src) * 1.4 | |
img.score -= len(img.parent) * 0.2 | |
img.score = int(img.score) | |
exists = False | |
for _img in retval: | |
if _img.src == img.src: | |
_img.score -= 300 | |
exists = True | |
break | |
if not exists: | |
if self._re_reject.search(img.src) and \ | |
not self._re_negative.search(img.src): | |
img.score -= 300 | |
retval.append(img) | |
return retval | |
def fetch_links(self): | |
for elem in self._soup.find_all('a'): | |
if elem.get('rel') in ['unfollow', 'nofollow']: | |
continue | |
link = Link() | |
for name, value in elem.attrs.items(): | |
if name == 'href': | |
link.href = urljoin(self._base_url, value) | |
idx = link.href.rfind('#') | |
if idx > -1: | |
link.href = link.href[:idx] | |
if self._re_pagelink.search(value): | |
link.score += 10 | |
p = urlparse(link.href) | |
path = os.path.dirname(p.path) | |
if self._base_path == path: | |
link.score += 10 | |
fn = self._base_name | |
idx = fn.rfind('.') | |
while idx > -1: | |
fn = fn[:idx] | |
if value.find(fn) > -1: | |
link.score += 30 | |
break | |
for q in '_-': | |
idx = fn.rfind(q) | |
if idx > -1: | |
break | |
if p.query: | |
query = parse_qs(p.query) | |
for k, val in query.items(): | |
if self._query.get(k): | |
link.score += 10 | |
if self._query[k] == val: | |
link.score += 30 | |
if self._base_host == p[1]: | |
link.score += 20 | |
if name == 'id' or name == 'class': | |
if name == 'class': | |
value = ' '.join(value) | |
if re.search('avatar', value, re.I): | |
link.score -= 50 | |
if re.search('^on+', name): | |
link.score -= 100 | |
if re.search('window\.close', value): | |
link.score -= 300 | |
if self._re_search_eng.search(link.href): | |
continue | |
if link.score < -100: | |
continue | |
if re.search('^https?://', link.href): | |
push = True | |
for _link in self.links: | |
if _link.href == link.href: | |
_link.count += 1 | |
push = False | |
break | |
if push: | |
link.parent = self.parse_parent(elem) | |
link.add_info() | |
text = elem.get_text().strip() | |
if self._re_pagenum.search(text): | |
link.score += 15 | |
elif self._re_nextlink.search(text): | |
link.score += 20 | |
elif self._re_prevlink.search(text): | |
link.score += 10 | |
elif 2 < len(text) < 6: | |
link.text = text | |
self.links.append(link) | |
def parse_parent(self, elem): | |
retval = [] | |
for p in elem.parents: | |
e = p.name | |
idx = p.get('id') | |
if idx: | |
e += '#%s' % idx | |
cls = p.get('class') | |
if cls: | |
cls = '.'.join(cls) | |
e += '.%s' % cls | |
retval.insert(0, e) | |
ignores = '[document] html body div img br link table tr td center' | |
for e in ignores.split(' '): | |
while e in retval: | |
retval.remove(e) | |
return retval | |
def parse_img_attrs(self, img, name, value): | |
if value is None: | |
return | |
if type(value) == list: | |
value = ' '.join(value) | |
value = value.strip() | |
if len(value) == 0: | |
return | |
if name == 'width' or name == 'height': | |
m = re.search('\d+', value) | |
if m: | |
value = int(m.group()) | |
if value > 200: | |
img.score += 10 | |
else: | |
img.score -= 100 | |
elif name == 'alt' or name == 'title': | |
# if value.find('%') > -1: | |
# value = qs.unescape(value) | |
if (self._re_text and self._re_text.search(value)) or \ | |
self._orig_title.find(value) > -1: | |
img.score += 100 | |
img.title = parse_title(value) | |
else: | |
img.score += 5 | |
elif name == 'src': | |
if self._base_url: | |
value = urljoin(self._base_url, value) | |
img.src = value | |
elif name == 'id' or name == 'class': | |
img.score += 5 | |
elif name == 'style': | |
vals = value.split(';') | |
for val in vals: | |
val = val.strip().lower() | |
m = re.search('^(width|height): ?(\d+)px', val) | |
if m: | |
size = int(m.group(2)) | |
if size > 200: | |
img.score += 10 | |
else: | |
img.score -= 100 | |
elif name == 'border': | |
img.score -= 3 | |
elif re.match('^on\w+/', name): | |
img.score += 10 | |
else: | |
# value = value.lower() | |
if name.find('data-') > -1: | |
img.score += 10 | |
else: | |
img.score += 5 | |
if re.search('''^(?:https?\:)//[^"']+(jpe?g|png|gif)''', | |
value, re.I): | |
img.score += 150 | |
elif re.search('img|photo', value, re.I): | |
img.score += 75 | |
elif re.search('\.(jpe?g|png|gif)', value, re.I): | |
img.score += 125 | |
value = urljoin(self._base_url, value) | |
img.attrs[name] = value | |
def get_imgs(self): | |
''' | |
return the images sorted by score | |
''' | |
if not self.imgs: | |
self.fetch_imgs() | |
self.imgs = list(filter(lambda x: x.src, self.imgs)) | |
self.imgs = sorted(self.imgs, key=lambda x: x.score, reverse=True) | |
return self.imgs | |
def get_sure_imgs(self, diff=0.01): | |
''' | |
return the images high score and the some elem_count and the same score | |
by the diff | |
''' | |
imgs = self.get_imgs()[:] | |
if len(imgs) == 0: | |
return [] | |
score = imgs[0].score | |
# if score < 0: | |
# return [] | |
parent = md5sum(imgs[0].parent) | |
if len(imgs) == 1: | |
return imgs | |
retval = [] | |
min_score = imgs[-1].score | |
range = score - min_score | |
for img in imgs: | |
key = md5sum(img.parent) | |
if key == parent or \ | |
1 - (img.score - min_score + 0.001)/(range + 0.001) < diff: | |
retval.append(img) | |
return retval | |
def get_same_imgs(self, elem_count, parent_length, score, diff=0.05): | |
''' | |
return the closed images on provide base the KNN | |
''' | |
# test | |
test = [elem_count, parent_length, score] | |
# sample | |
imgs = self.get_imgs()[:] | |
imgs.extend(self.fetch_script_imgs()) | |
if len(imgs) == 0: | |
return [] | |
samples = {} | |
for img in imgs: | |
samples[img] = [img.elem_count, img.parent_length, img.score] | |
min_vals = list(samples.values())[0][:] | |
max_vals = list(samples.values())[0][:] | |
length = len(min_vals) | |
for val in samples.values(): | |
for i in range(length): | |
if min_vals[i] > val[i]: | |
min_vals[i] = val[i] | |
if max_vals[i] < val[i]: | |
max_vals[i] = val[i] | |
ranges = [(a - b) for a, b in zip(max_vals, min_vals)] | |
for val in samples.values(): | |
for i in range(length): | |
val[i] = (val[i] - min_vals[i] + 0.001)/(ranges[i] + 0.001) | |
for i in range(length): | |
test[i] = (test[i] - min_vals[i] + 0.001)/(ranges[i] + 0.001) | |
# distance | |
distances = {} | |
for img, sample in samples.items(): | |
distances[img] = sum([(a - b)**2 for a, b in zip(test, sample)]) | |
# sort | |
sort_distances = sorted(distances.items(), key=lambda x: x[1]) | |
# get the sure img | |
retval = [] | |
for distance in sort_distances: | |
if distance[1] > diff: | |
break | |
retval.append(distance) | |
return retval | |
def get_links(self, is_all=False, min_score=20): | |
''' | |
return the links sorted by score and the score > min_score | |
''' | |
if not self.links: | |
self.fetch_links() | |
self.links = sorted(self.links, key=lambda x: x.score, reverse=True) | |
if is_all: | |
return self.links | |
if self.links and self.links[0].score < 0: | |
return [] | |
links = [] | |
for link in self.links: | |
if link.score >= min_score: | |
links.append(link) | |
return links | |
def get_same_links(self, elem_count, parent_length, score, diff=0.05): | |
''' | |
return the closed links on provide base the KNN | |
''' | |
# test | |
test = [elem_count, parent_length, score] | |
# sample | |
links = self.get_links(True)[:] | |
samples = {} | |
for link in links: | |
samples[link] = [link.elem_count, link.parent_length, link.score] | |
min_vals = list(samples.values())[0][:] | |
max_vals = list(samples.values())[0][:] | |
length = len(min_vals) | |
for val in samples.values(): | |
for i in range(length): | |
if min_vals[i] > val[i]: | |
min_vals[i] = val[i] | |
if max_vals[i] < val[i]: | |
max_vals[i] = val[i] | |
ranges = [(a - b) for a, b in zip(max_vals, min_vals)] | |
for val in samples.values(): | |
for i in range(length): | |
val[i] = (val[i] - min_vals[i])/ranges[i] | |
for i in range(length): | |
test[i] = (test[i] - min_vals[i])/ranges[i] | |
# distance | |
distances = {} | |
for link, sample in samples.items(): | |
distances[link] = sum([(a - b)**2 for a, b in zip(test, sample)]) | |
# sort | |
sort_distances = sorted(distances.items(), key=lambda x: x[1]) | |
# get the sure link | |
retval = [] | |
for distance in sort_distances: | |
if distance[1] > diff: | |
break | |
retval.append(distance) | |
return retval |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment