Skip to content

Instantly share code, notes, and snippets.

@journey-ad
Last active February 26, 2024 07:29
Show Gist options
  • Save journey-ad/14388855f7620755e21ecf239dbc47b1 to your computer and use it in GitHub Desktop.
Save journey-ad/14388855f7620755e21ecf239dbc47b1 to your computer and use it in GitHub Desktop.
P站批量下载脚本
'''
需管理员权限以创建软连接
需将ffmpeg加入环境变量PATH
仅在windows平台进行了测试
'''
import os
import argparse
import math
import json
import threading
import requests
import zipfile
import subprocess
import shutil
import tempfile
import sqlite3
from contextlib import contextmanager
SCRIPT_VERSION = "1.0"
API = 'https://api.imjad.cn/pixiv/v1/'
CWD = os.getcwd()
def get_member_info(member_id):
try:
conn = sqlite3.connect(os.path.join(CWD, 'pixiv.db'))
cursor = conn.cursor()
cmd='''
SELECT profile FROM member_id2profile WHERE member_id=?;
'''
cursor.execute(cmd, (member_id,))
values = cursor.fetchone()
if values:
resp = json.loads(values[0])
else:
params = {
'type': 'member',
'id': member_id
}
r = requests.get(API, params=params, timeout=20)
resp = json.loads(r.text)
if resp['status'] == 'success':
cmd='''
INSERT INTO member_id2profile (member_id, profile)
VALUES (?, ?);
'''
cursor.execute(cmd,(member_id, r.text))
else:
print(resp['errors']['system']['message'])
cursor.close()
conn.commit()
conn.close()
return resp
except Exception as err:
raise err
def get_img_url(illust_id, img_type='large'):
try:
conn = sqlite3.connect(os.path.join(CWD, 'pixiv.db'))
cursor = conn.cursor()
cmd='''
SELECT illust FROM illust_id2illust WHERE illust_id=?;
'''
cursor.execute(cmd, (illust_id,))
values = cursor.fetchone()
if values:
resp = json.loads(values[0])
else:
params = {
'type': 'illust',
'id': illust_id
}
r = requests.get(API, params=params, timeout=20)
resp = json.loads(r.text)
if resp['status'] == 'success':
cmd='''
INSERT INTO illust_id2illust (illust_id, illust)
VALUES (?, ?);
'''
cursor.execute(cmd,(illust_id, r.text))
else:
print(resp['errors']['system']['message'])
return None,None
page_list = []
frames = []
if not resp['response'][0]['metadata']:
page_list.append(resp['response'][0]['image_urls'][img_type])
elif 'zip_urls' in resp['response'][0]['metadata']:
frames = resp['response'][0]['metadata']['frames']
page_list.append(resp['response'][0]['metadata']['zip_urls']['ugoira1920x1080'])
else:
resp = resp['response'][0]['metadata']['pages']
for page in resp:
page_list.append(page['image_urls'][img_type])
cursor.close()
conn.commit()
conn.close()
return page_list,frames
except Exception as err:
print(illust_id)
raise err
def get_user_fav(user_id):
try:
params = {
'type': 'favorite',
'id': user_id,
'page': 1,
'per_page': 1000
}
r = requests.get(API, params=params, timeout=20)
resp = json.loads(r.text)
illust_id_list = []
pages = int(resp['pagination']['pages'])
print('共 %s 页,正在处理第 1 页' % (pages))
for illust in resp['response']:
illust_id_list.append(illust['work']['id'])
if pages > 1:
for x in range(2, pages+1):
params['page'] = x
r = requests.get(API, params=params, timeout=20)
resp = json.loads(r.text)
print('共 %s 页,正在处理第 %s 页' % (pages, x))
for illust in resp['response']:
illust_id_list.append(illust['work']['id'])
return illust_id_list
except Exception:
print('获取收藏列表失败,正在重试…')
return get_user_fav(user_id)
def get_user_illust(user_id, fav_limit=0):
try:
params = {
'type': 'member_illust',
'id': user_id,
'page': 1,
'per_page': 1000
}
r = requests.get(API, params=params, timeout=20)
resp = json.loads(r.text)
illust_id_list = []
pages = int(resp['pagination']['pages'])
print('共 %s 页,正在处理第 1 页' % (pages))
for illust in resp['response']:
fav_counts = illust['stats']['favorited_count']
fav_count = int(fav_counts['public']) + int(fav_counts['private'])
if fav_count >= int(fav_limit):
illust_id_list.append(illust['id'])
else:
print('作品 %s 收藏数为 %s, 低于阈值 %s, 忽略' % (illust['id'], fav_count, fav_limit))
if pages > 1:
for x in range(2, pages+1):
params['page'] = x
r = requests.get(API, params=params, timeout=20)
resp = json.loads(r.text)
print('共 %s 页,正在处理第 %s 页' % (pages, x))
for illust in resp['response']:
fav_counts = illust['stats']['favorited_count']
fav_count = int(fav_counts['public']) + int(fav_counts['private'])
if fav_count >= int(fav_limit):
illust_id_list.append(illust['id'])
else:
print('作品 %s 收藏数为 %s, 低于阈值 %s, 忽略' % (illust['id'], fav_count, fav_limit))
return illust_id_list
except Exception:
print('获取作品列表失败,正在重试…')
return get_user_illust(user_id, fav_limit)
def get_search_list(word, fav_limit=0):
if word == None:
print('请输入关键词')
exit()
try:
params = {
'type': 'search',
'mode': 'tag',
'word': word,
'per_page': 1000
}
r = requests.get(API, params=params, timeout=20)
resp = json.loads(r.text)
illust_id_list = []
pages = int(resp['pagination']['pages'])
print('共 %s 页,正在处理第 1 页' % (pages))
for illust in resp['response']:
fav_counts = illust['stats']['favorited_count']
fav_count = int(fav_counts['public']) + int(fav_counts['private'])
if fav_count >= int(fav_limit):
illust_id_list.append(illust['id'])
# else:
# print('作品 %s 收藏数为 %s, 低于阈值 %s, 忽略' % (illust['id'], fav_count, fav_limit))
if pages > 1:
for x in range(2, pages+1):
params['page'] = x
r = requests.get(API, params=params, timeout=20)
resp = json.loads(r.text)
print('共 %s 页,正在处理第 %s 页' % (pages, x))
for illust in resp['response']:
fav_counts = illust['stats']['favorited_count']
fav_count = int(fav_counts['public']) + int(fav_counts['private'])
if fav_count >= int(fav_limit):
illust_id_list.append(illust['id'])
# else:
# print('作品 %s 收藏数为 %s, 低于阈值 %s, 忽略' % (illust['id'], fav_count, fav_limit))
return illust_id_list
except Exception:
print('获取搜索结果列表失败,正在重试…')
return get_search_list(word, fav_limit)
def get_file_id_list(path):
allfile=set()
zerofile=set()
for dirpath,dirnames,filenames in os.walk(path):
for name in filenames:
ext = name.split('.')[-1]
if ext == 'jpg' or ext == 'png' or ext == 'gif':
if os.path.getsize(os.path.join(dirpath,name)) > 0:
allfile.add(int(name.split('_')[0]))
else:
zerofile.add(int(name.split('_')[0]))
if ext == 'zip':
name = name.split('.')[0] + '.webm'
if os.path.exists(os.path.join(dirpath,name)):
allfile.add(int(name.split('_')[0]))
allfile = list(allfile-zerofile)
return allfile
def download(list_, img_type='large', path='.'):
HEADERS = {
'Referer': 'https://www.pixiv.net'
}
for illust_id in list_:
print('获取作品(%s) 分页列表…' % (illust_id))
page_list, frames = get_img_url(illust_id, img_type)
if page_list:
for url in page_list:
file_name = url.split('/')[-1]
file_path = os.path.join(CWD, 'illusts', file_name)
link_path = os.path.join(path, file_name)
print('下载中,保存至 %s' % (link_path))
try:
os.makedirs(os.path.dirname(file_path), exist_ok=True)
os.makedirs(os.path.dirname(link_path), exist_ok=True)
if file_exists(link_path):
print('图片已存在')
else:
if not file_exists(file_path):
with open(file_path, 'wb') as file:
file.write(requests.get(url, headers=HEADERS, timeout=30).content)
os.symlink(file_path, link_path)
if frames:
print('发现动图,转换为WEBM…')
ugoira2webm(file_path, frames)
webm_filename = os.path.basename(file_path).split('.')[0] + ".webm"
file_path = os.path.join(CWD, 'illusts', webm_filename)
link_path = os.path.join(path, webm_filename)
os.symlink(file_path, link_path)
except Exception as e:
raise e
# pass
else:
print('投稿已被删除(%s),尝试从库中恢复…' % (illust_id))
illusts_path = os.path.join(CWD, 'illusts')
if illust_id_exists(illust_id):
print('发现备份文件(%s),正在恢复…' % (illust_id))
for file_name in os.listdir(illusts_path):
if int(file_name.split('_')[0]) == illust_id:
file_path = os.path.join(illusts_path, file_name)
link_path = os.path.join(path, file_name)
os.symlink(file_path, link_path)
def chunks(list_, num):
num = int(math.ceil(len(list_) / float(num)))
return list([list_[i:i + num] for i in range(0, len(list_), num)])
def get_format_filename(input_filename):
for s in ['?', '*', '<', '>', '\\', '!', '/']:
while s in input_filename:
input_filename = input_filename.strip().replace(s, '')
return input_filename
def file_exists(path):
if os.path.exists(path):
if os.path.getsize(path) > 0:
return True
else:
return False
else:
return False
def illust_id_exists(illust_id):
file_path = os.path.join(CWD, 'illusts')
for dirpath,dirnames,filenames in os.walk(file_path):
for name in filenames:
if illust_id == int(name.split('_')[0]) and os.path.getsize(os.path.join(dirpath, name)) > 0:
return True
@contextmanager
def cd(newdir):
olddir = os.getcwd()
os.chdir(os.path.expanduser(newdir))
try:
yield
finally:
os.chdir(olddir)
def ugoira2webm(file_path, frames):
path = os.path.dirname(file_path)
name = '.'.join(file_path.split('.')[:-1])
webm_filename = os.path.basename(name) + ".webm"
if not file_exists(os.path.join(path, webm_filename)):
with tempfile.TemporaryDirectory(prefix="ugoira2webm") as d:
ffconcat = "ffconcat version 1.0\n"
with zipfile.ZipFile(file_path) as f:
f.extractall(d)
with cd(d):
for i in frames:
ffconcat += "file " + i['file'] + '\n'
ffconcat += "duration " + str(i['delay_msec'] / 1000) + '\n'
with open("i.ffconcat", "w") as f:
f.write(ffconcat)
p = os.popen("ffmpeg -n -v quiet -i i.ffconcat -c:v libvpx-vp9 -lossless 1 " + webm_filename)
ret = p.close()
if ret is not None:
exit(ret)
shutil.move(os.path.join(d, webm_filename), os.path.join(path, webm_filename))
if __name__ == '__main__':
PARSER = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter,
description='Pixiv 批量下载脚本',
epilog='(c) 2017 journey.ad')
PARSER.add_argument('MEMBER_ID', type=str, nargs="?", help='用户的 Pixiv ID')
PARSER.add_argument('GET_TYPE', type=str, nargs="?", help='获取类型 illust favorite search file')
PARSER.add_argument('IMG_TYPE', type=str, nargs="?", help='图片类型')
PARSER.add_argument('PATH', type=str, nargs="?", help='保存地址')
PARSER.add_argument('FAV_LIMIT', type=str, nargs="?", help='收藏数阈值')
PARSER.add_argument("-v", "--version", action="version",
version="pixiv-favorite-download-helper {}".format(SCRIPT_VERSION))
ARGS = PARSER.parse_args()
try:
print('连接数据库…')
conn = sqlite3.connect(os.path.join(CWD, 'pixiv.db'))
print('数据库连接成功')
cursor = conn.cursor()
try:
cmd='''
CREATE TABLE IF NOT EXISTS illust_id2illust (
illust_id INT PRIMARY KEY
NOT NULL,
illust TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS member_id2profile (
member_id INT PRIMARY KEY
NOT NULL,
profile TEXT NOT NULL
);
'''
cursor.executescript(cmd)
except Exception as err:
raise err
# print('数据库创建失败,程序退出…')
# sys.exit()
finally:
cursor.close()
conn.commit()
conn.close()
except Exception as err:
raise err
# print('数据库连接失败,程序退出…')
# sys.exit()
if ARGS.MEMBER_ID:
MEMBER_ID = ARGS.MEMBER_ID
else:
print('请输入用户的 Pixiv ID')
exit()
IMG_TYPE = ARGS.IMG_TYPE if ARGS.IMG_TYPE else 'large'
GET_TYPE = ARGS.GET_TYPE if ARGS.GET_TYPE else 'illust'
if GET_TYPE == 'search':
KEY_WORD = MEMBER_ID
elif GET_TYPE == 'file':
INPUT_FILE = MEMBER_ID
else:
MEMBER_NAME = get_member_info(MEMBER_ID)['response'][0]['name']
PATH = ARGS.PATH if ARGS.PATH else os.path.join(CWD, '%s(%s)' % (get_format_filename(MEMBER_NAME), MEMBER_ID))
FAV_LIMIT = ARGS.FAV_LIMIT if ARGS.FAV_LIMIT else 0
try:
if GET_TYPE == 'illust':
PATH = os.path.join(CWD, '%s/%s(%s)' % (PATH, get_format_filename(MEMBER_NAME), MEMBER_ID))
print('获取用户 %s(%s) 的作品列表…' % (MEMBER_NAME, MEMBER_ID))
json_name = '%s/%s_%s.json' % (PATH, get_format_filename(MEMBER_ID), FAV_LIMIT)
if os.path.exists(json_name):
with open(json_name,'r') as f:
LIST = json.load(f)
else:
LIST = get_user_illust(MEMBER_ID, FAV_LIMIT)
elif GET_TYPE == 'favorite':
PATH = os.path.join(CWD, '%s/%s(%s)' % (PATH, get_format_filename(MEMBER_NAME), MEMBER_ID))
print('获取用户 %s(%s) 的收藏列表…' % (MEMBER_NAME, MEMBER_ID))
json_name = '%s/%s_%s.json' % (PATH, get_format_filename(MEMBER_ID), FAV_LIMIT)
LIST = get_user_fav(MEMBER_ID)
elif GET_TYPE == 'search':
PATH = os.path.join(CWD, '%s/%s' % (PATH, get_format_filename(KEY_WORD)))
print('获取关键词 %s 的结果列表…' % KEY_WORD)
json_name = '%s/%s_%s.json' % (PATH, get_format_filename(KEY_WORD), FAV_LIMIT)
if os.path.exists(json_name):
with open(json_name,'r') as f:
LIST = json.load(f)
else:
LIST = get_search_list(KEY_WORD, FAV_LIMIT)
elif GET_TYPE == 'file':
print('从 %s 获取下载列表…' % INPUT_FILE)
json_name = '%s/list_%s.json' % (PATH, FAV_LIMIT)
with open(INPUT_FILE,'r') as f:
LIST = json.load(f)
os.makedirs(os.path.dirname(json_name), exist_ok=True)
with open(json_name, 'w') as f:
json.dump(LIST, f)
EXIST_LIST = get_file_id_list(PATH)
print('获取成功, 共计 %s 件作品, 其中 %s 件已存在, 将自动跳过' % (len(LIST), len(EXIST_LIST)))
LIST = list(set(LIST) - set(EXIST_LIST))
if LIST:
LIST = chunks(LIST, 5)
else:
print('所有图像都已被下载')
os.sys.exit()
JOBS = []
for item in LIST:
JOBS.append(threading.Thread(target=download, args=(item, IMG_TYPE, PATH)))
for job in JOBS:
job.start()
for job in JOBS:
job.join()
except Exception as err:
raise err
finally:
conn.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment