Skip to content

Instantly share code, notes, and snippets.

@RicterZ
Last active January 14, 2023 09:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save RicterZ/ed3517b944a04e18c91aacfe3d9aff24 to your computer and use it in GitHub Desktop.
Save RicterZ/ed3517b944a04e18c91aacfe3d9aff24 to your computer and use it in GitHub Desktop.
Bilibili TV Controller written by Python
# Bilibili TV Controller
#
# Requirements
# - qrencode_ascii
# - zio
import threading
import json
import time
import logging
import sys
import requests
import re
import socket
import struct
import random
import os
import hashlib
import qrencode_ascii
from urllib.parse import urlparse
from functools import wraps
from enum import Enum
from zio import zio, b8, b32
logger = logging.getLogger()
SETUP_MSG = '''SETUP /projection NVA/1.0\r
Session: deadbeef-1337-1337-1337-deadbeef1337\r
Connection: Keep-Alive\r
\r
'''
class ColorFormatter(logging.Formatter):
grey = "\x1b[38;20m"
blue = "\x1b[34;20m"
yellow = "\x1b[33;20m"
red = "\x1b[31;20m"
bold_red = "\x1b[31;1m"
reset = "\x1b[0m"
format = "[%(levelname)s]: %(message)s"
FORMATS = {
logging.DEBUG: grey + format + reset,
logging.INFO: blue + format + reset,
logging.WARNING: yellow + format + reset,
logging.ERROR: red + format + reset,
logging.CRITICAL: bold_red + format + reset
}
def format(self, record):
log_fmt = self.FORMATS.get(record.levelno)
formatter = logging.Formatter(log_fmt)
return formatter.format(record)
class Flag(Enum):
# ping command from TV every 1 second
PING = b'\xe4'
# flag of commands sent from client
COMMAND = b'\xe0'
# flag of commands sent from TV
RESPONSE = b'\xc0'
def check_settle(playing=True):
def _func(func):
def _wrapper(func):
@wraps(func)
def wrapper(*args, **kwargs):
if not args[0].settle:
logger.info('TV not settle, please run setup first')
return
if playing and not args[0].playing:
logger.info('Video not playing, cannot change states')
return
return func(*args, **kwargs)
return wrapper
return _wrapper(func)
return _func
def get_epid_and_ssid(aid):
url = 'https://www.bilibili.com/video/av{}/'.format(aid)
headers = requests.head(url, allow_redirects=False).headers
if not 'location' in headers:
return 0, 0
epid = headers['location'].split('/')[-1]
if not epid.startswith('ep'):
return 0, 0
resp = requests.get(headers['location']).text
ssid = re.findall('https://www.bilibili.com/bangumi/play/ss(\d+)/', resp)
if ssid:
return epid[2:], ssid[0]
return 0, 0
def bv_decode(code):
table = 'fZodR9XQDSUm21yCkr6zBqiveYah8bt4xsWpHnJE7jL5VG3guMTKNPAwcF'
tr = {}
for i in range(58):
tr[table[i]] = i
s = [11, 10, 3, 8, 4, 6]
xor = 177451812
add = 8728348608
r = 0
for i in range(6):
r += tr[code[s[i]]] * 58 ** i
return (r - add) ^ xor
def restore():
pass
def save(obj):
pass
class BilibiliTV():
io = None
target = None
discovered = []
danmaku = False
# is video playing
playing = False
# video position
position = 0
# video info
info = None
# video speed
speed = 1
supported_speeds = None
# video quality
quality = None
supported_qualities = None
# video volume
volume = 0
# setup
settle = False
dispatcher_thread = None
command_response = None
access_key = None
def __init__(self, addr=None):
self.target = addr
self.command_response = {}
def ping(self):
msg = Flag.RESPONSE.value
msg += b'\x00'
msg += b32(0x1337)
self.io.write(msg)
def make_command(self, command, args=None):
# message flag
while True:
serial = random.randint(0, 0xffffffff)
if serial not in self.command_response:
break
logger.debug('Make command {} with serial {} ...'.format(command, serial))
msg = Flag.COMMAND.value
# message parts count
if args is None:
args = ()
else:
# for now, only 1 argument been used
args = [json.dumps(args)]
msg += b8(len(args) + 2)
# message serial number
msg += b32(serial)
msg += b'\x01'
# command string
msg += b'\x07Command'
msg += b8(len(command))
msg += command.encode()
# arguments
if not isinstance(args, (list, tuple)):
args = (args, )
for arg in args:
if not isinstance(arg, bytes):
arg = str(arg).encode()
msg += b32(len(arg))
msg += arg
return serial, msg
def dispatch(self):
while getattr(self.dispatcher_thread, 'not_stop', True):
flag = self.io.read(1)
if flag not in Flag._value2member_map_:
logger.error('Unknown message flag: {}'.format(repr(flag)))
raise Exception('Unknown message flag')
parts = int.from_bytes(self.io.read(1), 'big')
serial = int.from_bytes(self.io.read(4), 'big')
logger.debug('Get {} message with serial {}'.format(Flag._value2member_map_[flag].name, serial))
if flag == Flag.PING.value:
self.settle = True
if parts == 0:
self.command_response[serial] = None
continue
# one more byte in commands from client
if flag == Flag.COMMAND.value:
_ = self.io.read(1)
result = []
for i in range(parts):
# skip Command XXXX which length is only 1 byte
c = 1 if i <= 1 and flag != Flag.RESPONSE.value else 4
length = int.from_bytes(self.io.read(c), 'big')
data = self.io.read(length)
result.append(data)
self.command_response[serial] = result
if len(result) == 1:
ret = json.loads(result[0])
if 'volume' in ret:
self.volume = ret['volume']
else:
logger.info(ret)
continue
command = '{}.{}'.format(result[0].decode(), result[1].decode())
logger.debug('Received {}'.format(command))
arguments = []
if len(result) >= 2:
for arg in result[2:]:
arguments.append(json.loads(arg))
logger.debug('argument: {}'.format(arg.decode()))
if command == 'Command.OnProgress':
self.position = arguments[0]['position']
self.playing = True
elif command == 'Command.SpeedChanged':
self.speed = arguments[0]['currSpeed']
self.supported_speeds = arguments[0]['supportSpeedList']
logger.debug('Current speed: {}, supported speeds: {}'.format(
arguments[0]['currSpeed'],
' / '.join(map(str, arguments[0]['supportSpeedList'])))
)
elif command == 'Command.OnDanmakuSwitch':
self.danmaku = arguments[0]['open']
logger.debug('Danmaku switch is {}'.format(arguments[0]['open']))
elif command == 'Command.PLAY_SUCCESS':
self.playing = True
logger.debug('Start playing video command successfully')
elif command == 'Command.OnPlayState':
state = arguments[0]['playState']
if state == 5:
logger.debug('Video playing paused')
elif state == 4:
logger.debug('Video playing resumed')
elif state == 7:
self.playing = False
logger.debug('Video playing stopped')
elif command == 'Command.OnQnSwitch':
text = []
self.quality = arguments[0]['curQn']
self.supported_qualities = arguments[0]['supportQnList']
elif command == 'Command.OnEpisodeSwitch':
self.info = arguments[0]
logger.debug('Playing video {} (av{})'.format(self.info['title'],
self.info['playItem']['aid']))
elif command == 'Command.Error':
logger.warning('Error: {}'.format(arguments[0]['errorCode']))
else:
logger.debug('Unimplemented command {}'.format(command))
def discover(self):
logger.info('Starting discovering Bilibili TV ...')
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('239.255.255.250', 1900))
mreq = struct.pack("4sl", socket.inet_aton('239.255.255.250'), socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
count = 0
while True:
data = sock.recv(1024).decode()
for line in data.split('\n'):
line = line.strip()
if line.lower().startswith('location'):
link = line.split(':', 1)[1].strip()
resp = requests.get(link).text
# bilibili
if 'Bilibili Inc.' in resp:
name = re.findall('<friendlyName>(.*)</friendlyName>', resp)
name = 'Unknown' if not name else name[0]
udn = re.findall('<UDN>uuid:(.*)</UDN>', resp)[0]
if udn not in self.discovered:
self.discovered.append(udn)
addr = (urlparse(link).hostname, urlparse(link).port)
logger.info('Found Bilibili TV: {}:{} ({})'.format(*addr, name))
break
time.sleep(1)
count += 1
if count == 5:
logger.info('Discover finished')
break
def setup(self, addr=None):
if self.target is None and addr is None:
logger.error('Target address not be set')
return
self.target = addr
logger.info('Initialize connection ...')
self.io = zio((self.target), print_write=False, print_read=False)
self.io.write(SETUP_MSG)
self.io.read_until('\r\n\r\n')
self.dispatcher_thread = threading.Thread(target=self.dispatch)
self.dispatcher_thread.start()
c = 0
while not self.settle:
self.ping()
if c >= 3:
logger.error('setup failed for no response')
return
c += 1
time.sleep(1)
self.get_volume()
logger.info('Setup TV connection successfully')
@check_settle(playing=False)
def set_volume(self, volume):
logger.info('Set the volume to {}'.format(volume))
if not volume.isdigit():
return
self.volume = int(volume)
_, msg = self.make_command('SetVolume', {'volume': self.volume})
self.io.write(msg)
@check_settle(playing=False)
def get_volume(self):
serial, msg = self.make_command('GetVolume')
self.io.write(msg)
while serial in self.command_response:
return self.volume
@check_settle(playing=False)
def play(self, aid=0, biz_id=0, room_id=0, epid=0, season_id=0, cid=0):
self.speed = 1
self.quality = None
self.supported_speeds = None
self.supported_qualities = None
self.info = None
if epid == 0 and season_id == 0:
epid, season_id = get_epid_and_ssid(aid)
content_type = 0 if not epid else 1
data = {
'biz_id': biz_id,
'roomId': room_id,
'oid': aid,
'aid': aid,
'epId': epid,
'cid': cid,
'seasonId': season_id,
'type': 0,
'contentType': content_type,
'autoNext': 'true',
'userDesireQn': '112',
'accessKey': self.access_key
}
if self.access_key is None:
logger.warning('access_key is not be set, cannot play high quality videos, try login')
logger.info('Sending play video command ...')
serial, msg = self.make_command('Play', data)
self.io.write(msg)
count = 0
while True:
if self.info is not None:
logger.info('Playing video: {}(av{})'.format(self.info['title'],
self.info['playItem']['aid']))
text = ''
current = ''
for k in self.info['qnDesc']['supportQnList']:
if self.info['qnDesc']['curQn'] == k['quality']:
current = k['description']
text += '{}({}) / '.format(k['description'], k['quality'])
logger.info('Current quality: {}'.format(current))
logger.info('Supported quality: {}'.format(text[:-2]))
break
if count == 3:
logger.error('Play video failed, please check the video ID')
break
time.sleep(1)
count += 1
@check_settle()
def stop(self):
logger.info('Stop playing video ...')
self.playing = False
_, msg = self.make_command('Stop')
self.io.write(msg)
self.dispatcher_thread.not_stop = False
@check_settle()
def pause(self):
logger.info('Pause video playing ...')
_, msg = self.make_command('Pause')
self.io.write(msg)
@check_settle()
def resume(self):
logger.info('Resume video playing ...')
_, msg = self.make_command('Resume')
self.io.write(msg)
@check_settle()
def seek(self, t):
logger.info('Seek to {} sec ...'.format(t))
_, msg = self.make_command('Seek', {'seekTs': t})
self.io.write(msg)
def set_seek(self, position):
self.seek(position)
def get_seek(self):
return self.position
@check_settle()
def set_speed(self, speed):
logger.info('Set speed to {} ...'.format(speed))
if float(speed) not in self.supported_speeds:
logger.warning('Unsupported speed specified')
return
_, msg = self.make_command('SwitchSpeed', {'speed': speed})
self.io.write(msg)
@check_settle()
def get_speed(self):
return self.speed
@check_settle(playing=False)
def toggle_danmaku(self):
self.danmaku = not self.danmaku
logger.info('Toggle danmaku to {} ...'.format(self.danmaku))
_, msg = self.make_command('SwitchDanmaku', {'open': self.danmaku})
self.io.write(msg)
@check_settle()
def set_quality(self, quality):
if not self.playing:
return
if int(quality) not in map(lambda k: k['quality'], self.supported_qualities):
logger.warning('Unsupported quality specified')
return
logger.info('Set quality to {} ...'.format(quality))
_, msg = self.make_command('SwitchQn', {'qn': quality})
self.io.write(msg)
@check_settle()
def get_quality(self):
return self.quality
def parse_command(self, command):
args = command[1:]
command = command[0].lower()
if command in ('quit', 'exit'):
if self.dispatcher_thread:
self.dispatcher_thread.not_stop = False
raise SystemExit
elif command in ('pause', 'resume', 'result', 'danmaku', 'stop', 'discover'):
if command == 'danmaku':
command = 'toggle_danmaku'
self.__getattribute__(command)()
elif command in ('quality', 'speed', 'volume', 'seek'):
if len(args) == 0:
ret = self.__getattribute__('get_{}'.format(command))()
if ret:
print(ret)
else:
self.__getattribute__('set_{}'.format(command))(*args)
elif command == 'play':
if not args:
print('Usage: play [avXXXXXXX]')
return
if args[0].lower().startswith('bv'):
av = bv_decode(args[0])
elif args[0].startswith('av'):
av = args[0][2:]
else:
av = args[0]
self.play(aid=av)
elif command == 'setup':
if len(args) < 2:
print('Usage: setup [host] [port]')
return
self.setup((args[0], int(args[1])))
elif command == 'debug':
logger.setLevel(logging.DEBUG)
elif command == 'login':
if len(args) > 0:
self.login(args[0])
else:
self.login()
def interactive(self):
while True:
try:
command = input('📺 >>> ').split(' ')
except (EOFError, KeyboardInterrupt):
self.dispatcher_thread.not_stop = False
break
self.parse_command(command)
def login(self, access_key=None):
if access_key is not None:
self.access_key = access_key
return
def get_sign(data):
s = ''
for k, v in data.items():
s += '{}={}&'.format(k, v)
s = s[:-1]
s += '59b43e04ad6965f34319062b478f83dd'
return hashlib.md5(s.encode()).hexdigest()
base = 'https://passport.bilibili.com/x/passport-tv-login/qrcode/'
url = base + 'auth_code'
data = {
'appkey': '4409e2ce8ffd12b8',
'local_id': 0,
'ts': 0,
}
data['sign'] = get_sign(data)
ret = requests.post(url, data=data).json()
qrcode = qrencode_ascii.encode(ret['data']['url'])
logger.info('Please scan qrcode and scan via Bilibili app')
print(qrcode)
url = base + '/poll'
data = {
'appkey': '4409e2ce8ffd12b8',
'auth_code': ret['data']['auth_code'],
'local_id': 0,
'ts': 0,
}
data['sign'] = get_sign(data)
while True:
ret = requests.post(url, data=data).json()
if ret['code'] == 0:
self.access_key = ret['data']['access_token']
logger.info('Login successfully with user {}'.format(ret['data']['mid']))
logger.info('Access key: {}'.format(self.access_key))
break
time.sleep(1)
if __name__ == '__main__':
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setFormatter(ColorFormatter())
logger.addHandler(handler)
b = BilibiliTV()
# b.discover()
b.setup(('192.168.31.198', 9958))
b.login(access_key='xxxxxxxx')
b.interactive()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment