Skip to content

Instantly share code, notes, and snippets.

@Sg4Dylan
Created July 19, 2020 08:23
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Sg4Dylan/96a35419ac4550ad76de0c46aea35075 to your computer and use it in GitHub Desktop.
Save Sg4Dylan/96a35419ac4550ad76de0c46aea35075 to your computer and use it in GitHub Desktop.
使用 Ascii2D 批量匹配可能存在于 Pixiv 的 Twitter 图片
import os
import re
import json
import urllib
import requests
import wget
from tqdm import tqdm
from bs4 import BeautifulSoup
# 执行推理用
import numpy as np
from PIL import Image
import onnxruntime
session = requests.Session()
session.proxies.update({
'http': 'http://127.0.0.1:1080',
'https': 'http://127.0.0.1:1080'
})
api = 'https://ascii2d.net/search/url/'
pbs = 'https://pbs.twimg.com/media/'
# OFA 模型: https://github.com/Sg4Dylan/EfficientIR/tree/master/models
onnx_session = onnxruntime.InferenceSession('ofa-sim.onnx', None)
model_input = onnx_session.get_inputs()[0].name
img_size = 260
record = []
if os.path.exists('record.json'):
record = json.loads(open('record.json', 'rb').read())
def get_match_list(fname):
r = session.get(f'{api}{pbs}{fname}:orig')
if 'ファイルのダウンロードに失敗しました' in r.text:
print('\nAscii2D 返回: 下载文件失败')
return
soup = BeautifulSoup(r.text, 'html.parser')
target_div = soup.find_all("div", {"class": "row item-box"})
thumbnail_pattern = '<img alt=.*? loading="lazy" src="(.*?)"'
original_pattern = '<a href="(.*?)" rel="noopener" target="_blank">.*?</a>'
results = []
count = 0
for i in target_div:
count += 1
if count > 5:
break
if not 'pixiv' in str(i):
continue
# 提取缩略图链接
thumbnail_url = ''
re_result = re.findall(thumbnail_pattern, str(i))
if re_result is not None:
thumbnail_url = re_result[0]
else:
continue
# 提取原始链接
original_url = ''
re_result = re.findall(original_pattern, str(i))
if re_result == []:
continue
original_url = re_result[0]
results.append({
'thumbnail_url': thumbnail_url,
'original_url': original_url
})
return results
def check_if_match(original_img, thumbnail_list):
def img_preprocess(image_path):
try:
img = Image.open(image_path).resize((img_size, img_size),Image.BICUBIC).convert('RGB')
except OSError:
print(f'\nFile broken: {image_path}')
return None
input_data = np.array(img).transpose(2, 0, 1)
# 预处理
img_data = input_data.astype('float32')
mean_vec = np.array([0.485, 0.456, 0.406])
stddev_vec = np.array([0.229, 0.224, 0.225])
norm_img_data = np.zeros(img_data.shape).astype('float32')
for i in range(img_data.shape[0]):
norm_img_data[i,:,:] = (img_data[i,:,:]/255 - mean_vec[i]) / stddev_vec[i]
# add batch channel
norm_img_data = norm_img_data.reshape(1, 3, img_size, img_size).astype('float32')
return norm_img_data
def get_fv(image_path):
norm_img_data = img_preprocess(image_path)
if norm_img_data is None:
return None
return onnx_session.run([], {model_input: norm_img_data})[0]
def calc_sim(fv1, fv2):
# L2 距离
q = sum((fv1-fv2)**2)
return (1-np.tanh(q/3000))*100
def download_thumbnail(img_url, temp_name):
# https://ascii2d.net/search/url/https://pbs.twimg.com/media/Ec5dMBuUYAAsZGA.jpg:orig
count = 3
while count > 0:
try:
wget.download('https://ascii2d.net/'+img_url, temp_name)
return True
except urllib.error.HTTPError:
count -= 1
return False
# 计算本地图片特征向量
fv_orig = get_fv(original_img)
# 依次抓取缩略图计算特征向量
for i in thumbnail_list:
temp_ext = i['thumbnail_url'].split('.')[-1]
temp_name = f'temp_download.{temp_ext}'
dl_result = download_thumbnail(i['thumbnail_url'], temp_name)
if not dl_result:
print(f'\n {i} 下载错误')
continue
fv_temp = get_fv(temp_name)
fv_sim = calc_sim(fv_orig, fv_temp)
os.remove(temp_name)
if fv_sim > 90:
print(f'\nOriginal: {original_img} -> 匹配到相似度: {fv_sim} %')
return i['original_url']
return None
# 需要匹配的目录
target_path = 'test'
# 最终结果列表
matched_list = []
while True:
try:
for i in tqdm(os.listdir(target_path), ascii=True):
# 判断是否处理过
if i in record:
continue
# 收入已处理列表
record.append(i)
with open('record.json', 'wb') as wp:
wp.write(json.dumps(record).encode('utf-8'))
# 从 Ascii2d 获取匹配结果
thumbnail_list = None
thumbnail_list = get_match_list(i)
if thumbnail_list is None:
continue
# 抓取结果的缩略图用神经网络判断是否相似
match_result = check_if_match(f'{target_path}/{i}', thumbnail_list)
if match_result is not None:
matched_list.append([i, match_result])
print(f'\nOriginal: {i} -> 匹配到: {match_result}')
else:
print(f'\nOriginal: {i} -> 未匹配成功!')
break
except:
print('Internal ERROR')
print(matched_list)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment