Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@Virace
Created January 21, 2021 10:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save Virace/f6036bfd2baa8129ec6b142230b10924 to your computer and use it in GitHub Desktop.
Save Virace/f6036bfd2baa8129ec6b142230b10924 to your computer and use it in GitHub Desktop.
哔哩哔哩视频上传
# -*- coding: utf-8 -*-
# @Time : 2021/1/8 22:29
# @Author : Virace
# @Email : Virace@aliyun.com
# @Site : x-item.com
# @Software: PyCharm
# @Detail : B站视频上传相关
import base64
import datetime
import hashlib
import json
import logging
import math
import os
import time
from dataclasses import dataclass
from typing import *
from urllib import parse
import requests
import rsa
from requests.adapters import HTTPAdapter
from urllib3 import Retry
# logging.captureWarnings(True)
log = logging.getLogger(__name__)
# log.addHandler(logging.NullHandler())
_TOOLS_VERSION = '2.2.0.1057'
@dataclass(repr=False)
class VideoChunk:
filename: str
filesize: int
md5: str
chunk: int
chunks: int
chunk_data: bytes
version: str = _TOOLS_VERSION
@dataclass
class VideoPart:
path: str
title: str = ''
desc: str = ''
server_name: str = ''
class Bilibili:
# 上传工具的key以及secret
_APPKEY = 'aae92bc66f3edfab'
_APPSECRET = 'af125a0d5279fd576c1b4418a3e8276d'
_TOKENFILE = '_token'
_TOKENEXPIRES = 2592000
def __init__(self, token_file: Union[str, bool] = None):
"""
初始化, 如果填写已保存的token文件则自动登录
:param token_file:
"""
self.session = requests.session()
self.session.verify = False
# debug
def debug_response(r, *args, **kwargs):
log.debug(r.text)
self.session.hooks = {'response': debug_response}
self.session.headers.update({
'Accept': None,
# 'Content-Type': 'application/x-www-form-urlencoded',
'User-Agent': ''
})
self.access_token = None
self.refresh_token = None
# cookie
self.sid = None
# 用户id
self.mid = None
self.cookie_time: Union[int, None] = None
if token_file:
if isinstance(token_file, str):
temp = token_file
elif isinstance(token_file, bool):
temp = self._TOKENFILE
else:
raise Exception('Token文件参数类型错误.')
self._load_token_file(temp)
# 验证token时间
self._check_token_time()
self.session.cookies.update({'sid': self.sid})
if self.access_token and self.refresh_token:
self._get_user_info()
def _check_token_time(self):
if self.cookie_time:
to = datetime.datetime.fromtimestamp(self.cookie_time)
interval = to - datetime.datetime.now()
if interval.days <= 2:
# 需要更新token
self._refresh_token()
pass
def _get_user_info(self):
data = self._signed_body(
f'access_key={self.access_token}&appkey={self._APPKEY}&platform=pc&ts={int(time.time())}')
response = self.session.get(
url="https://app.bilibili.com/x/v2/account/myinfo",
params=data
)
response.raise_for_status()
res = response.json()
assert res['code'] == 0, res['message']
data = res['data']
self.mid = data['mid']
log.info(data['name'])
def _load_token_file(self, token_file):
assert os.path.exists(token_file), 'token文件不存在'
with open(token_file, encoding='utf-8') as f:
data = json.load(f)
self.access_token = data['access_token']
self.refresh_token = data['refresh_token']
self.cookie_time = data['cookie_time']
self.sid = data['sid']
def _save_user_token(self):
if self.access_token and self.refresh_token:
with open(self._TOKENFILE, 'w+', encoding='utf-8') as f:
json.dump(dict(
access_token=self.access_token,
refresh_token=self.refresh_token,
sid=self.sid,
cookie_time=(datetime.datetime.now() + datetime.timedelta(seconds=self._TOKENEXPIRES)).timestamp()
), f)
def _signed_body(self, body: Union[str, dict]) -> Union[str, dict]:
"""
:return: body which be added sign
"""
def md5(s):
obj = hashlib.md5()
obj.update(s.encode('utf-8'))
return obj.hexdigest()
def sign(s):
"""
:return: return sign
"""
return md5(s + self._APPSECRET)
if isinstance(body, str):
return body + '&sign=' + sign(body)
elif isinstance(body, dict):
ls = []
for k, v in body.items():
ls.append(k + '=' + v)
body['sign'] = sign('&'.join(ls))
return body
def _refresh_token(self):
response = self.session.post(
headers={'Content-type': "application/x-www-form-urlencoded"},
url='https://passport.bilibili.com/api/v2/oauth2/refresh_token',
params=self._signed_body(
f'access_key={self.access_token}&appkey={self._APPKEY}&refresh_token={self.refresh_token}'
f'&ts={int(time.time())}')
)
res = response.json()
assert res['code'] == 0, res['message']
data = res['data']['token_info']
self.access_token = data['access_token']
self.refresh_token = data['refresh_token']
self.mid = data['mid']
# token过期时间
self._TOKENEXPIRES = data['expires_in']
# 保存用户token
self._save_user_token()
def login(self, username, password):
def get_key(appkey):
# 获取加密所需数据
_r = self.session.post(
url='https://passport.bilibili.com/api/oauth2/getKey',
params=self._signed_body(f'appkey={appkey}&platform=pc&ts={int(time.time())}'),
headers={'Content-type': "application/x-www-form-urlencoded"},
)
# {"ts":1544152439,"code":0,"data":{"hash":"99c7573759582e0b","key":"-----BEGIN PUBLIC----- -----END
# PUBLIC KEY-----\n"}}
_res = _r.json()
assert _res['code'] == 0, _r.text
_data = _res['data']
# 加密密码的hash、key, 以及cookie
return _data['hash'], _data['key'], _r.cookies['sid']
# 获取加密key
h, k, self.sid = get_key(self._APPKEY)
# 加密密码
pwd = base64.b64encode(
rsa.encrypt(
(h + password).encode('utf-8'),
rsa.PublicKey.load_pkcs1_openssl_pem(k.encode()),
),
)
# 编码
username = parse.quote_plus(username)
password = parse.quote_plus(pwd)
# 签名
post_data = self._signed_body(f'appkey={self._APPKEY}&password={password}&platform=pc'
f'&ts={int(time.time())}&username={username}')
response = self.session.post(
url="https://passport.bilibili.com/api/v3/oauth2/login",
data=post_data,
headers={'Content-type': "application/x-www-form-urlencoded"},
)
res = response.json()
assert res['code'] == 0, response.text
status = res['data']['status']
assert status == 0, response.text
# 获取token, 以及用户id
data = res['data']['token_info']
self.access_token = data['access_token']
self.refresh_token = data['refresh_token']
self.mid = data['mid']
# token过期时间
self._TOKENEXPIRES = data['expires_in']
# 保存用户token
self._save_user_token()
def upload_cover(self, file: str):
"""
上传预览图
:param file: 只接受png文件格式
:return:
"""
assert os.path.exists(file), '图片文件不存在'
with open(file, "rb") as f:
img = f.read()
params = self._signed_body(f'access_key={self.access_token}')
files = {
'file': ("cover.png", img, "Content-Type: image/png"),
}
# ?access_key=76fe8598172b7c348217d832f2ffa511&sign=8354da9b8dc51f2051d9cb844e060265
r = self.session.post(
"http://member.bilibili.com/x/vu/client/cover/up",
params=params,
files=files
)
res = r.json()
log.debug(res)
assert res['code'] == 0, r.text
return res["data"]["url"]
def upload_video(self, videos: Union[VideoPart, List[VideoPart]], max_retry=5) -> List[VideoPart]:
"""
上传视频, 支持分P, 返回服务器上传成功后的文件名
:param videos:
:param max_retry: 上传重试次数
:return:
"""
def pre_upload():
# 预上传, 返回上传链接 完成触发链接 以及 服务器文件名
_response = self.session.get(
f'http://member.bilibili.com/preupload?access_key={self.access_token}&mid={self.mid}'
f'&profile=ugcfr%2Fpc3')
# PS: 这个链接不验证access_token以及cookie, 所以没有测试到错误返回, 错误处理部分缺失
# {
# "complete": "http://upcdn-szqn.bilivideo.com/vs814/upload3-complete/xxxx",
# "OK": 1,
# "url": "http://upcdn-szqn.bilivideo.com/vs814/upload3/xxxx",
# "filename": "m210107142qhbrx3io8tjx1x0gg8vuh3"
# }
_data = _response.json()
assert _data['OK'] == 1, _response.text
return _data['filename'], _data['url'], _data['complete']
def upload_chunk(url: str, data: VideoChunk):
for i in range(max_retry):
files = {
'version': (None, data.version),
'filesize': (None, data.filesize),
'chunk': (None, data.chunk),
'chunks': (None, data.chunks),
'md5': (None, data.md5),
'file': (data.filename, data.chunk_data, 'application/octet-stream')
}
_response = self.session.post(
url=url,
files=files,
timeout=60)
if _response.status_code == 200:
return _response
log.info(_response.text)
log.info('{}/{} retry stage {}/{}'.format(chunks_index + 1, chunks_num, i, max_retry))
log.info('sleep %ds', 5 * i)
time.sleep(5 * i)
chunk_size = 2097906
if not isinstance(videos, list):
videos = [videos]
retries = Retry(
total=max_retry,
backoff_factor=1,
status_forcelist=(504,),
)
self.session.mount('https://', HTTPAdapter(max_retries=retries))
self.session.mount('http://', HTTPAdapter(max_retries=retries))
# 分P上传后返回服务器文件名
result = list()
# 分P上传
for video in videos:
filepath = video.path
filesize = os.path.getsize(filepath)
filehash = hashlib.md5()
chunks_num = math.ceil(filesize / chunk_size)
server_filename, upload_url, complete_url = pre_upload()
self.session.cookies.update({'PHPSESSID': server_filename})
with open(filepath, 'rb') as file:
chunks_index = 0
while True:
chunk_data = file.read(chunk_size)
if not chunk_data:
break
response = upload_chunk(upload_url, VideoChunk(
filename=os.path.basename(filepath),
filesize=len(chunk_data),
md5=hashlib.md5(chunk_data).hexdigest(),
chunk=chunks_index,
chunks=chunks_num,
chunk_data=chunk_data,
))
assert response.status_code == 200 and response.json()[
'OK'] == 1, f'上传失败超过最大重试次数,{chunks_index}/{chunks_num}, data:{response.text}'
log.info('upload part {}/{}'.format(chunks_index + 1, chunks_num))
chunks_index += 1
filehash.update(chunk_data)
post_data = {
'chunks': chunks_num,
'filesize': filesize,
'md5': filehash.hexdigest(),
'name': os.path.basename(filepath),
'version': _TOOLS_VERSION,
}
response = self.session.post(url=complete_url, data=post_data)
assert response.status_code == 200 and response.json()['OK'] == 1, response.text
video.server_name = server_filename
result.append(video)
log.info(result)
return result
def add_video(self,
videos: Union[VideoPart, List[VideoPart]],
title: str,
desc: str,
tid: int,
tag: str,
cover: str = '',
source: str = '',
cr: int = 1,
no_reprint: int = 0,
open_elec: int = 1,
):
post_data = {"build": 1057,
# 自制1 转载2
"copyright": cr,
# 预览图
"cover": cover,
# 视频介绍
"desc": desc,
# 禁止转载 0:无 1:禁止
"no_reprint": no_reprint,
# 是否开启充电 0 or 1
"open_elec": open_elec,
# 转载来源
"source": source,
# 标签
"tag": tag,
# 分区ID https://github.com/SocialSisterYi/bilibili-API-collect/blob/master/video/video_zone.md
"tid": tid,
# 视频标题
"title": title,
# 视频
"videos": [
# {
# # 介绍
# "desc": "",
# # 服务器文件名
# "filename": "m210107142qhbrx3io8tjx1x0gg8vuh3",
# # 分P标题
# "title": "jaukFNrAPHo"}
]
}
for item in videos:
post_data['videos'].append({
"desc": item.desc,
"filename": item.server_name,
"title": item.title
})
response = self.session.post(
url=f'http://member.bilibili.com/x/vu/client/add',
params=self._signed_body(f'access_key={self.access_token}'),
json=post_data
)
assert response.status_code == 200 and response.json()['code'] == 0, response.text
data = response.json()["data"]
return data["aid"], data["bvid"]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment