Created
July 19, 2020 08:23
-
-
Save Sg4Dylan/96a35419ac4550ad76de0c46aea35075 to your computer and use it in GitHub Desktop.
使用 Ascii2D 批量匹配可能存在于 Pixiv 的 Twitter 图片
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import re | |
import json | |
import urllib | |
import requests | |
import wget | |
from tqdm import tqdm | |
from bs4 import BeautifulSoup | |
# 执行推理用 | |
import numpy as np | |
from PIL import Image | |
import onnxruntime | |
session = requests.Session() | |
session.proxies.update({ | |
'http': 'http://127.0.0.1:1080', | |
'https': 'http://127.0.0.1:1080' | |
}) | |
api = 'https://ascii2d.net/search/url/' | |
pbs = 'https://pbs.twimg.com/media/' | |
# OFA 模型: https://github.com/Sg4Dylan/EfficientIR/tree/master/models | |
onnx_session = onnxruntime.InferenceSession('ofa-sim.onnx', None) | |
model_input = onnx_session.get_inputs()[0].name | |
img_size = 260 | |
record = [] | |
if os.path.exists('record.json'): | |
record = json.loads(open('record.json', 'rb').read()) | |
def get_match_list(fname): | |
r = session.get(f'{api}{pbs}{fname}:orig') | |
if 'ファイルのダウンロードに失敗しました' in r.text: | |
print('\nAscii2D 返回: 下载文件失败') | |
return | |
soup = BeautifulSoup(r.text, 'html.parser') | |
target_div = soup.find_all("div", {"class": "row item-box"}) | |
thumbnail_pattern = '<img alt=.*? loading="lazy" src="(.*?)"' | |
original_pattern = '<a href="(.*?)" rel="noopener" target="_blank">.*?</a>' | |
results = [] | |
count = 0 | |
for i in target_div: | |
count += 1 | |
if count > 5: | |
break | |
if not 'pixiv' in str(i): | |
continue | |
# 提取缩略图链接 | |
thumbnail_url = '' | |
re_result = re.findall(thumbnail_pattern, str(i)) | |
if re_result is not None: | |
thumbnail_url = re_result[0] | |
else: | |
continue | |
# 提取原始链接 | |
original_url = '' | |
re_result = re.findall(original_pattern, str(i)) | |
if re_result == []: | |
continue | |
original_url = re_result[0] | |
results.append({ | |
'thumbnail_url': thumbnail_url, | |
'original_url': original_url | |
}) | |
return results | |
def check_if_match(original_img, thumbnail_list): | |
def img_preprocess(image_path): | |
try: | |
img = Image.open(image_path).resize((img_size, img_size),Image.BICUBIC).convert('RGB') | |
except OSError: | |
print(f'\nFile broken: {image_path}') | |
return None | |
input_data = np.array(img).transpose(2, 0, 1) | |
# 预处理 | |
img_data = input_data.astype('float32') | |
mean_vec = np.array([0.485, 0.456, 0.406]) | |
stddev_vec = np.array([0.229, 0.224, 0.225]) | |
norm_img_data = np.zeros(img_data.shape).astype('float32') | |
for i in range(img_data.shape[0]): | |
norm_img_data[i,:,:] = (img_data[i,:,:]/255 - mean_vec[i]) / stddev_vec[i] | |
# add batch channel | |
norm_img_data = norm_img_data.reshape(1, 3, img_size, img_size).astype('float32') | |
return norm_img_data | |
def get_fv(image_path): | |
norm_img_data = img_preprocess(image_path) | |
if norm_img_data is None: | |
return None | |
return onnx_session.run([], {model_input: norm_img_data})[0] | |
def calc_sim(fv1, fv2): | |
# L2 距离 | |
q = sum((fv1-fv2)**2) | |
return (1-np.tanh(q/3000))*100 | |
def download_thumbnail(img_url, temp_name): | |
# https://ascii2d.net/search/url/https://pbs.twimg.com/media/Ec5dMBuUYAAsZGA.jpg:orig | |
count = 3 | |
while count > 0: | |
try: | |
wget.download('https://ascii2d.net/'+img_url, temp_name) | |
return True | |
except urllib.error.HTTPError: | |
count -= 1 | |
return False | |
# 计算本地图片特征向量 | |
fv_orig = get_fv(original_img) | |
# 依次抓取缩略图计算特征向量 | |
for i in thumbnail_list: | |
temp_ext = i['thumbnail_url'].split('.')[-1] | |
temp_name = f'temp_download.{temp_ext}' | |
dl_result = download_thumbnail(i['thumbnail_url'], temp_name) | |
if not dl_result: | |
print(f'\n {i} 下载错误') | |
continue | |
fv_temp = get_fv(temp_name) | |
fv_sim = calc_sim(fv_orig, fv_temp) | |
os.remove(temp_name) | |
if fv_sim > 90: | |
print(f'\nOriginal: {original_img} -> 匹配到相似度: {fv_sim} %') | |
return i['original_url'] | |
return None | |
# 需要匹配的目录 | |
target_path = 'test' | |
# 最终结果列表 | |
matched_list = [] | |
while True: | |
try: | |
for i in tqdm(os.listdir(target_path), ascii=True): | |
# 判断是否处理过 | |
if i in record: | |
continue | |
# 收入已处理列表 | |
record.append(i) | |
with open('record.json', 'wb') as wp: | |
wp.write(json.dumps(record).encode('utf-8')) | |
# 从 Ascii2d 获取匹配结果 | |
thumbnail_list = None | |
thumbnail_list = get_match_list(i) | |
if thumbnail_list is None: | |
continue | |
# 抓取结果的缩略图用神经网络判断是否相似 | |
match_result = check_if_match(f'{target_path}/{i}', thumbnail_list) | |
if match_result is not None: | |
matched_list.append([i, match_result]) | |
print(f'\nOriginal: {i} -> 匹配到: {match_result}') | |
else: | |
print(f'\nOriginal: {i} -> 未匹配成功!') | |
break | |
except: | |
print('Internal ERROR') | |
print(matched_list) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment