Skip to content

Instantly share code, notes, and snippets.

@journey-ad
Last active May 4, 2019 06:22
Show Gist options
  • Save journey-ad/0885c71bc5d3e2ca8822defc16b2da55 to your computer and use it in GitHub Desktop.
Save journey-ad/0885c71bc5d3e2ca8822defc16b2da55 to your computer and use it in GitHub Desktop.
对下载好的P站作品进行分类
import os
import sys
import time
import argparse
import math
import json
import threading
import requests
import sqlite3
SCRIPT_VERSION = "1.1"
API = 'https://api.imjad.cn/pixiv/v1/'
def get_illust_detail(illust_id):
try:
conn = sqlite3.connect(os.path.join(os.getcwd(), 'pixiv.db'))
cursor = conn.cursor()
cmd='''
SELECT illust FROM illust_id2illust WHERE illust_id=?;
'''
cursor.execute(cmd, (illust_id,))
values = cursor.fetchone()
if values and not IS_ALL:
resp = json.loads(values[0])
else:
params = {
'type': 'illust',
'id': illust_id,
'action': 'update'
}
r = requests.get(API, params=params, timeout=20)
resp = json.loads(r.text)
if resp['status'] == 'success':
cmd='''
REPLACE INTO illust_id2illust (illust_id, illust)
VALUES (?, ?);
'''
cursor.execute(cmd,(illust_id, r.text))
else:
print(resp['errors']['system']['message'])
return None
cursor.close()
conn.commit()
conn.close()
return resp
except Exception as err:
print(illust_id)
raise err
def get_file_list(path, is_all):
allfile=[]
if is_all:
for dirpath,dirnames,filenames in os.walk(path):
for name in filenames:
if name.split('.')[-1] != 'json':
allfile.append(os.path.join(dirpath, name))
else:
for name in os.listdir(path):
if not os.path.isdir(os.path.join(path, name)):
if name.split('.')[-1] != 'json':
allfile.append(os.path.join(path, name))
return allfile
def get_illust_id_by_filename(filename):
try:
illust_id = filename.split('\\')[-1].split('_')[0]
if illust_id:
return illust_id
else:
return -1
except Exception as err:
raise err
def chunks(list_, num):
num = int(math.ceil(len(list_) / float(num)))
return list([list_[i:i + num] for i in range(0, len(list_), num)])
def get_format_filename(input_filename):
for s in ['?', '*', '<', '>', '\\', '/']:
while s in input_filename:
input_filename = input_filename.strip().replace(s, '')
return input_filename
def illust_sort(filelist, path='.', type='member_r18'):
if type == 'reset':
flag_delete_empty_dirs = True
else:
flag_delete_empty_dirs = False
illust_dict=dict()
for illust_file_name in filelist:
old_file_path = os.path.dirname(illust_file_name)
illust_id = illust_file_name.split('\\')[-1].split('_')[0]
illust_dict[illust_id]=old_file_path
for illust_id, old_file_path in illust_dict.items():
if type == 'reset':
file_path = path
else:
print('获取作品(%s) 详情…' % (illust_id))
illust_detail = get_illust_detail(illust_id)
if illust_detail != None:
if type == 'member_r18':
member_detail = illust_detail['response'][0]['user']
member_name = '%s(%s)' % (member_detail['name'], member_detail['id'])
member_name = '%s/%s' % (get_format_filename(member_name), get_format_filename(illust_detail['response'][0]['title']))
if illust_detail['response'][0]['age_limit'] == 'r18':
print('R-18内容(%s)' % (illust_id))
member_name = '%s/%s' % ('r18', member_name)
elif illust_detail['response'][0]['age_limit'] == 'r18-g':
print('R-18G内容(%s)' % (illust_id))
member_name = '%s/%s' % ('r18-g', member_name)
file_path = '%s/%s' % (path, member_name)
elif (type == 'fav_num') or (type == 'fav_num_r18'):
sort_list = [0, 10, 50, 100, 200, 500, 1000, 2000, 5000, 10000]
illust_fav_num = int(illust_detail['response'][0]['stats']['favorited_count']['public'])+int(illust_detail['response'][0]['stats']['favorited_count']['private'])
_temp_sort_index = 0
while (illust_fav_num >= sort_list[_temp_sort_index]):
sort_name = sort_list[_temp_sort_index]
_temp_sort_index+=1
if illust_fav_num >= 10000:
sort_name = 10000
break
if type == 'fav_num_r18':
if illust_detail['response'][0]['age_limit'] == 'r18':
print('R-18内容(%s)' % (illust_id))
sort_name = '%s/%s' % ('r18', sort_name)
elif illust_detail['response'][0]['age_limit'] == 'r18-g':
print('R-18G内容(%s)' % (illust_id))
sort_name = '%s/%s' % ('r18-g', sort_name)
file_path = '%s/%s' % (path, sort_name)
elif type == 'r18':
if illust_detail['response'][0]['age_limit'] == 'r18':
print('R-18内容(%s)' % (illust_id))
file_path = '%s/r18' % (path)
elif illust_detail['response'][0]['age_limit'] == 'r18-g':
print('R-18G内容(%s)' % (illust_id))
file_path = '%s/r18-g' % (path)
else:
file_path = path
else:
print('type 错误')
sys.exit()
else:
file_path = '%s/deleted' % (path)
time.sleep(0.5)
os.makedirs(file_path, exist_ok=True)
old_file_path = old_file_path.replace('\\', '/')
file_path = file_path.replace('\\', '/')
if old_file_path != file_path:
print('移动 %s/%s 至 %s' % (old_file_path, illust_id, file_path))
for file_name in os.listdir(old_file_path):
if file_name.split('_')[0] == illust_id:
os.rename(os.path.join(old_file_path, file_name), os.path.join(file_path, file_name))
if flag_delete_empty_dirs:
if not os.listdir(old_file_path):
os.rmdir(old_file_path)
if __name__ == '__main__':
PARSER = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter,
description='对下载好的 Pixiv 作品进行分类',
epilog='(c) 2017 journey.ad')
PARSER.add_argument('SORT_TYPE', type=str, nargs="?", help='分类方法')
PARSER.add_argument('PATH', type=str, nargs="?", help='作品目录')
PARSER.add_argument('ALL', type=str, nargs="?", help='是否整理所有文件')
PARSER.add_argument("-v", "--version", action="version",
version="pixiv-content-sort-helper {}".format(SCRIPT_VERSION))
ARGS = PARSER.parse_args()
try:
print('连接数据库…')
conn = sqlite3.connect(os.path.join(os.getcwd(), 'pixiv.db'))
print('数据库连接成功')
cursor = conn.cursor()
try:
cmd='''
CREATE TABLE IF NOT EXISTS illust_id2illust (
illust_id INT PRIMARY KEY
NOT NULL,
illust TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS member_id2profile (
member_id INT PRIMARY KEY
NOT NULL,
profile TEXT NOT NULL
);
'''
cursor.executescript(cmd)
except Exception as err:
raise err
# print('数据库创建失败,程序退出…')
# sys.exit()
finally:
cursor.close()
conn.commit()
conn.close()
except Exception as err:
raise err
# print('数据库连接失败,程序退出…')
# sys.exit()
if ARGS.SORT_TYPE:
SORT_TYPE = ARGS.SORT_TYPE
else:
SORT_TYPE = 'member_r18'
if ARGS.PATH:
PATH = os.path.join(os.getcwd(), ARGS.PATH)
else:
print('请输入作品目录')
exit()
if ARGS.ALL == 'true':
IS_ALL = True
else:
IS_ALL = False
try:
print('准备对 %s 目录内作品进行分类…' % (PATH))
LIST = get_file_list(PATH, IS_ALL)
print('遍历完成, 共计 %s 件作品' % (len(LIST)))
if LIST:
LIST = chunks(LIST, 5)
else:
print('所有图像都已被分类')
os.sys.exit()
JOBS = []
for item in LIST:
JOBS.append(threading.Thread(target=illust_sort, args=(item, PATH, SORT_TYPE)))
for job in JOBS:
job.start()
for job in JOBS:
job.join()
except Exception as err:
raise err
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment