/bilibili.py Secret
Created
January 21, 2021 10:12
哔哩哔哩视频上传
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
# @Time : 2021/1/8 22:29 | |
# @Author : Virace | |
# @Email : Virace@aliyun.com | |
# @Site : x-item.com | |
# @Software: PyCharm | |
# @Detail : B站视频上传相关 | |
import base64 | |
import datetime | |
import hashlib | |
import json | |
import logging | |
import math | |
import os | |
import time | |
from dataclasses import dataclass | |
from typing import * | |
from urllib import parse | |
import requests | |
import rsa | |
from requests.adapters import HTTPAdapter | |
from urllib3 import Retry | |
# logging.captureWarnings(True) | |
log = logging.getLogger(__name__) | |
# log.addHandler(logging.NullHandler()) | |
_TOOLS_VERSION = '2.2.0.1057' | |
@dataclass(repr=False) | |
class VideoChunk: | |
filename: str | |
filesize: int | |
md5: str | |
chunk: int | |
chunks: int | |
chunk_data: bytes | |
version: str = _TOOLS_VERSION | |
@dataclass | |
class VideoPart: | |
path: str | |
title: str = '' | |
desc: str = '' | |
server_name: str = '' | |
class Bilibili: | |
# 上传工具的key以及secret | |
_APPKEY = 'aae92bc66f3edfab' | |
_APPSECRET = 'af125a0d5279fd576c1b4418a3e8276d' | |
_TOKENFILE = '_token' | |
_TOKENEXPIRES = 2592000 | |
def __init__(self, token_file: Union[str, bool] = None): | |
""" | |
初始化, 如果填写已保存的token文件则自动登录 | |
:param token_file: | |
""" | |
self.session = requests.session() | |
self.session.verify = False | |
# debug | |
def debug_response(r, *args, **kwargs): | |
log.debug(r.text) | |
self.session.hooks = {'response': debug_response} | |
self.session.headers.update({ | |
'Accept': None, | |
# 'Content-Type': 'application/x-www-form-urlencoded', | |
'User-Agent': '' | |
}) | |
self.access_token = None | |
self.refresh_token = None | |
# cookie | |
self.sid = None | |
# 用户id | |
self.mid = None | |
self.cookie_time: Union[int, None] = None | |
if token_file: | |
if isinstance(token_file, str): | |
temp = token_file | |
elif isinstance(token_file, bool): | |
temp = self._TOKENFILE | |
else: | |
raise Exception('Token文件参数类型错误.') | |
self._load_token_file(temp) | |
# 验证token时间 | |
self._check_token_time() | |
self.session.cookies.update({'sid': self.sid}) | |
if self.access_token and self.refresh_token: | |
self._get_user_info() | |
def _check_token_time(self): | |
if self.cookie_time: | |
to = datetime.datetime.fromtimestamp(self.cookie_time) | |
interval = to - datetime.datetime.now() | |
if interval.days <= 2: | |
# 需要更新token | |
self._refresh_token() | |
pass | |
def _get_user_info(self): | |
data = self._signed_body( | |
f'access_key={self.access_token}&appkey={self._APPKEY}&platform=pc&ts={int(time.time())}') | |
response = self.session.get( | |
url="https://app.bilibili.com/x/v2/account/myinfo", | |
params=data | |
) | |
response.raise_for_status() | |
res = response.json() | |
assert res['code'] == 0, res['message'] | |
data = res['data'] | |
self.mid = data['mid'] | |
log.info(data['name']) | |
def _load_token_file(self, token_file): | |
assert os.path.exists(token_file), 'token文件不存在' | |
with open(token_file, encoding='utf-8') as f: | |
data = json.load(f) | |
self.access_token = data['access_token'] | |
self.refresh_token = data['refresh_token'] | |
self.cookie_time = data['cookie_time'] | |
self.sid = data['sid'] | |
def _save_user_token(self): | |
if self.access_token and self.refresh_token: | |
with open(self._TOKENFILE, 'w+', encoding='utf-8') as f: | |
json.dump(dict( | |
access_token=self.access_token, | |
refresh_token=self.refresh_token, | |
sid=self.sid, | |
cookie_time=(datetime.datetime.now() + datetime.timedelta(seconds=self._TOKENEXPIRES)).timestamp() | |
), f) | |
def _signed_body(self, body: Union[str, dict]) -> Union[str, dict]: | |
""" | |
:return: body which be added sign | |
""" | |
def md5(s): | |
obj = hashlib.md5() | |
obj.update(s.encode('utf-8')) | |
return obj.hexdigest() | |
def sign(s): | |
""" | |
:return: return sign | |
""" | |
return md5(s + self._APPSECRET) | |
if isinstance(body, str): | |
return body + '&sign=' + sign(body) | |
elif isinstance(body, dict): | |
ls = [] | |
for k, v in body.items(): | |
ls.append(k + '=' + v) | |
body['sign'] = sign('&'.join(ls)) | |
return body | |
def _refresh_token(self): | |
response = self.session.post( | |
headers={'Content-type': "application/x-www-form-urlencoded"}, | |
url='https://passport.bilibili.com/api/v2/oauth2/refresh_token', | |
params=self._signed_body( | |
f'access_key={self.access_token}&appkey={self._APPKEY}&refresh_token={self.refresh_token}' | |
f'&ts={int(time.time())}') | |
) | |
res = response.json() | |
assert res['code'] == 0, res['message'] | |
data = res['data']['token_info'] | |
self.access_token = data['access_token'] | |
self.refresh_token = data['refresh_token'] | |
self.mid = data['mid'] | |
# token过期时间 | |
self._TOKENEXPIRES = data['expires_in'] | |
# 保存用户token | |
self._save_user_token() | |
def login(self, username, password): | |
def get_key(appkey): | |
# 获取加密所需数据 | |
_r = self.session.post( | |
url='https://passport.bilibili.com/api/oauth2/getKey', | |
params=self._signed_body(f'appkey={appkey}&platform=pc&ts={int(time.time())}'), | |
headers={'Content-type': "application/x-www-form-urlencoded"}, | |
) | |
# {"ts":1544152439,"code":0,"data":{"hash":"99c7573759582e0b","key":"-----BEGIN PUBLIC----- -----END | |
# PUBLIC KEY-----\n"}} | |
_res = _r.json() | |
assert _res['code'] == 0, _r.text | |
_data = _res['data'] | |
# 加密密码的hash、key, 以及cookie | |
return _data['hash'], _data['key'], _r.cookies['sid'] | |
# 获取加密key | |
h, k, self.sid = get_key(self._APPKEY) | |
# 加密密码 | |
pwd = base64.b64encode( | |
rsa.encrypt( | |
(h + password).encode('utf-8'), | |
rsa.PublicKey.load_pkcs1_openssl_pem(k.encode()), | |
), | |
) | |
# 编码 | |
username = parse.quote_plus(username) | |
password = parse.quote_plus(pwd) | |
# 签名 | |
post_data = self._signed_body(f'appkey={self._APPKEY}&password={password}&platform=pc' | |
f'&ts={int(time.time())}&username={username}') | |
response = self.session.post( | |
url="https://passport.bilibili.com/api/v3/oauth2/login", | |
data=post_data, | |
headers={'Content-type': "application/x-www-form-urlencoded"}, | |
) | |
res = response.json() | |
assert res['code'] == 0, response.text | |
status = res['data']['status'] | |
assert status == 0, response.text | |
# 获取token, 以及用户id | |
data = res['data']['token_info'] | |
self.access_token = data['access_token'] | |
self.refresh_token = data['refresh_token'] | |
self.mid = data['mid'] | |
# token过期时间 | |
self._TOKENEXPIRES = data['expires_in'] | |
# 保存用户token | |
self._save_user_token() | |
def upload_cover(self, file: str): | |
""" | |
上传预览图 | |
:param file: 只接受png文件格式 | |
:return: | |
""" | |
assert os.path.exists(file), '图片文件不存在' | |
with open(file, "rb") as f: | |
img = f.read() | |
params = self._signed_body(f'access_key={self.access_token}') | |
files = { | |
'file': ("cover.png", img, "Content-Type: image/png"), | |
} | |
# ?access_key=76fe8598172b7c348217d832f2ffa511&sign=8354da9b8dc51f2051d9cb844e060265 | |
r = self.session.post( | |
"http://member.bilibili.com/x/vu/client/cover/up", | |
params=params, | |
files=files | |
) | |
res = r.json() | |
log.debug(res) | |
assert res['code'] == 0, r.text | |
return res["data"]["url"] | |
def upload_video(self, videos: Union[VideoPart, List[VideoPart]], max_retry=5) -> List[VideoPart]: | |
""" | |
上传视频, 支持分P, 返回服务器上传成功后的文件名 | |
:param videos: | |
:param max_retry: 上传重试次数 | |
:return: | |
""" | |
def pre_upload(): | |
# 预上传, 返回上传链接 完成触发链接 以及 服务器文件名 | |
_response = self.session.get( | |
f'http://member.bilibili.com/preupload?access_key={self.access_token}&mid={self.mid}' | |
f'&profile=ugcfr%2Fpc3') | |
# PS: 这个链接不验证access_token以及cookie, 所以没有测试到错误返回, 错误处理部分缺失 | |
# { | |
# "complete": "http://upcdn-szqn.bilivideo.com/vs814/upload3-complete/xxxx", | |
# "OK": 1, | |
# "url": "http://upcdn-szqn.bilivideo.com/vs814/upload3/xxxx", | |
# "filename": "m210107142qhbrx3io8tjx1x0gg8vuh3" | |
# } | |
_data = _response.json() | |
assert _data['OK'] == 1, _response.text | |
return _data['filename'], _data['url'], _data['complete'] | |
def upload_chunk(url: str, data: VideoChunk): | |
for i in range(max_retry): | |
files = { | |
'version': (None, data.version), | |
'filesize': (None, data.filesize), | |
'chunk': (None, data.chunk), | |
'chunks': (None, data.chunks), | |
'md5': (None, data.md5), | |
'file': (data.filename, data.chunk_data, 'application/octet-stream') | |
} | |
_response = self.session.post( | |
url=url, | |
files=files, | |
timeout=60) | |
if _response.status_code == 200: | |
return _response | |
log.info(_response.text) | |
log.info('{}/{} retry stage {}/{}'.format(chunks_index + 1, chunks_num, i, max_retry)) | |
log.info('sleep %ds', 5 * i) | |
time.sleep(5 * i) | |
chunk_size = 2097906 | |
if not isinstance(videos, list): | |
videos = [videos] | |
retries = Retry( | |
total=max_retry, | |
backoff_factor=1, | |
status_forcelist=(504,), | |
) | |
self.session.mount('https://', HTTPAdapter(max_retries=retries)) | |
self.session.mount('http://', HTTPAdapter(max_retries=retries)) | |
# 分P上传后返回服务器文件名 | |
result = list() | |
# 分P上传 | |
for video in videos: | |
filepath = video.path | |
filesize = os.path.getsize(filepath) | |
filehash = hashlib.md5() | |
chunks_num = math.ceil(filesize / chunk_size) | |
server_filename, upload_url, complete_url = pre_upload() | |
self.session.cookies.update({'PHPSESSID': server_filename}) | |
with open(filepath, 'rb') as file: | |
chunks_index = 0 | |
while True: | |
chunk_data = file.read(chunk_size) | |
if not chunk_data: | |
break | |
response = upload_chunk(upload_url, VideoChunk( | |
filename=os.path.basename(filepath), | |
filesize=len(chunk_data), | |
md5=hashlib.md5(chunk_data).hexdigest(), | |
chunk=chunks_index, | |
chunks=chunks_num, | |
chunk_data=chunk_data, | |
)) | |
assert response.status_code == 200 and response.json()[ | |
'OK'] == 1, f'上传失败超过最大重试次数,{chunks_index}/{chunks_num}, data:{response.text}' | |
log.info('upload part {}/{}'.format(chunks_index + 1, chunks_num)) | |
chunks_index += 1 | |
filehash.update(chunk_data) | |
post_data = { | |
'chunks': chunks_num, | |
'filesize': filesize, | |
'md5': filehash.hexdigest(), | |
'name': os.path.basename(filepath), | |
'version': _TOOLS_VERSION, | |
} | |
response = self.session.post(url=complete_url, data=post_data) | |
assert response.status_code == 200 and response.json()['OK'] == 1, response.text | |
video.server_name = server_filename | |
result.append(video) | |
log.info(result) | |
return result | |
def add_video(self, | |
videos: Union[VideoPart, List[VideoPart]], | |
title: str, | |
desc: str, | |
tid: int, | |
tag: str, | |
cover: str = '', | |
source: str = '', | |
cr: int = 1, | |
no_reprint: int = 0, | |
open_elec: int = 1, | |
): | |
post_data = {"build": 1057, | |
# 自制1 转载2 | |
"copyright": cr, | |
# 预览图 | |
"cover": cover, | |
# 视频介绍 | |
"desc": desc, | |
# 禁止转载 0:无 1:禁止 | |
"no_reprint": no_reprint, | |
# 是否开启充电 0 or 1 | |
"open_elec": open_elec, | |
# 转载来源 | |
"source": source, | |
# 标签 | |
"tag": tag, | |
# 分区ID https://github.com/SocialSisterYi/bilibili-API-collect/blob/master/video/video_zone.md | |
"tid": tid, | |
# 视频标题 | |
"title": title, | |
# 视频 | |
"videos": [ | |
# { | |
# # 介绍 | |
# "desc": "", | |
# # 服务器文件名 | |
# "filename": "m210107142qhbrx3io8tjx1x0gg8vuh3", | |
# # 分P标题 | |
# "title": "jaukFNrAPHo"} | |
] | |
} | |
for item in videos: | |
post_data['videos'].append({ | |
"desc": item.desc, | |
"filename": item.server_name, | |
"title": item.title | |
}) | |
response = self.session.post( | |
url=f'http://member.bilibili.com/x/vu/client/add', | |
params=self._signed_body(f'access_key={self.access_token}'), | |
json=post_data | |
) | |
assert response.status_code == 200 and response.json()['code'] == 0, response.text | |
data = response.json()["data"] | |
return data["aid"], data["bvid"] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment