Skip to content

Instantly share code, notes, and snippets.

@ookiineko
Last active September 24, 2023 15:56
Show Gist options
  • Save ookiineko/df180c3ec924f242b91c7ed4604c4ff2 to your computer and use it in GitHub Desktop.
Save ookiineko/df180c3ec924f242b91c7ed4604c4ff2 to your computer and use it in GitHub Desktop.
Play with MPV but using Neat Download Manager browser plugin (and load extra subtitle files
#!/usr/bin/env python3
# SPDX-License-Identifier: GLWTPL
import asyncio
import typing
import enum
import dataclasses
import argparse
import os.path
import sys
import threading
import multiprocessing
import subprocess
import warnings
import websockets.server
import websockets.typing
import websockets.exceptions
import PySide6.QtWidgets
__version__ = '0.1.0'
class PayloadField(enum.StrEnum):
_SEP = ':'
_NL = '\n'
_SPC = ' '
METHOD = '1'
URL = '2'
MODE = '6'
PAGE_TITLE = '4'
ORIGIN = 'Origin'
REFERER = 'Referer'
PAGE_URL = '5'
FILE_DESC = '9'
COOKIE = 'Cookie'
class Mode(enum.StrEnum):
NORMAL = 'normal'
HLS = 'hls' # HTTP Live Streaming
MEDIA = 'media'
_KWPAIR = dict[str, str]
_payload_field_to_kwname_map: _KWPAIR = {
PayloadField.METHOD: 'method',
PayloadField.URL: 'url',
PayloadField.MODE: 'mode',
PayloadField.PAGE_TITLE: 'page_title',
PayloadField.ORIGIN: 'origin',
PayloadField.REFERER: 'referer',
PayloadField.PAGE_URL: 'page_url',
PayloadField.FILE_DESC: 'file_desc',
PayloadField.COOKIE: 'cookie'
}
_payload_opt_kwname: tuple[str] = (
'file_desc', 'cookie'
)
__COOKIE_SEP = ';'
__COOKIE_ITEM_SEP= '='
__COOKIE_SPC = ' '
def parse_cookie(raw: str) -> _KWPAIR:
res = {}
for idx, item in enumerate(
x.strip() for x in raw.split(__COOKIE_SEP)
):
if not item:
continue
if item.count(__COOKIE_ITEM_SEP) == 0:
raise ValueError(
f'no equal sign in item #{idx}'
)
k, v = (
x.strip() for x in item.split(
__COOKIE_ITEM_SEP, 1
)
)
if k in res.keys():
raise ValueError(f'duplicated item {k!r}')
else:
res[k] = v
return res
def format_cookie(cookie: _KWPAIR) -> str:
return (
__COOKIE_SEP + __COOKIE_SPC
).join(
__COOKIE_ITEM_SEP.join(
x
) for x in cookie.items()
)
@dataclasses.dataclass
class Payload:
method: str
url: str
mode: Mode
page_title: str
origin: str
referer: str
page_url: str
file_desc: typing.Optional[str]
cookie: typing.Optional[_KWPAIR]
@classmethod
def parse(cls, raw: str) -> typing.Self:
param: _KWPAIR = {}
for lno, line in enumerate(
x.strip() for x in raw.split(
PayloadField._NL
)
):
if not line:
continue
if line.count(PayloadField._SEP) == 0:
raise ValueError(
f'no colon at line #{lno}'
)
k, v = (
x.strip() for x in line.split(
PayloadField._SEP, 1
)
)
if (
kwname := _payload_field_to_kwname_map.get(k)
) is None:
raise ValueError(
f'unknown field {k!r} at line #{lno}'
)
if k == PayloadField.MODE:
v = Mode(v)
elif k == PayloadField.COOKIE:
v = parse_cookie(v)
if kwname in param.keys():
raise ValueError(
f'duplicated field {k!r} at line #{lno}'
)
else:
param[kwname] = v
for opt_kwname in _payload_opt_kwname:
try:
param[opt_kwname]
except KeyError:
param[opt_kwname] = None
return cls(**param)
def format(self) -> str:
__pad = ''
buff: list[tuple[str, tuple[str, ...]]] = [
(PayloadField.METHOD, (self.method,)),
(PayloadField.URL, (self.url,)),
(PayloadField.MODE, (self.mode,)),
(PayloadField.PAGE_TITLE, (self.page_title,)),
(PayloadField.ORIGIN, (__pad, self.origin)),
(PayloadField.REFERER, (__pad, self.referer)),
(PayloadField.PAGE_URL, (self.page_url,))
]
if self.file_desc is not None:
buff.append(
(PayloadField.FILE_DESC, (self.file_desc,))
)
if self.cookie is not None:
buff.append(
(
PayloadField.COOKIE, (
__pad,
format_cookie(self.cookie)
)
)
)
return PayloadField._NL.join(
(
PayloadField._SEP.join(
(k, PayloadField._SPC.join(v))
) for k, v in buff
)
)
class NeatDMProtocol(enum.StrEnum):
LISTEN_ADDR = 'localhost'
LISTEN_PORT = '10007'
SUBPROTOCOL = 'neatextension.v1'
NDM_VERSION = 'WinVersion1424'
@classmethod
def get_subprotocols(
cls
) -> tuple[websockets.typing.Subprotocol]:
return (
websockets.typing.Subprotocol(
cls.SUBPROTOCOL
),
)
class ControlMessage(enum.StrEnum):
# TODO: what do these stuffs do?
VERSION = NeatDMProtocol.NDM_VERSION
SHOW_PANEL_CHROME = 'ShowPanelChrome=1'
SHOW_PANEL_FOX = 'ShowPanelFox=1'
SHOW_PANEL_EDGE = 'ShowPanelEdge=1'
NO_WAITING = 'nowaiting'
async def _intro(
ws: websockets.server.WebSocketServerProtocol
):
# TODO: are these ctrl msg needed?
await ws.send(ControlMessage.VERSION)
await ws.send(ControlMessage.SHOW_PANEL_CHROME)
await ws.send(ControlMessage.SHOW_PANEL_FOX)
await ws.send(ControlMessage.SHOW_PANEL_EDGE)
__fchoose_event = threading.Event()
__fchoose_event_lock = threading.Lock()
__fchoose_callback_event = threading.Event()
__fchoose_data: list[str] = []
def __fchooser_thread():
global __fchoose_data
__app = PySide6.QtWidgets.QApplication(sys.argv)
while True:
__fchoose_event.wait()
__fchoose_event.clear()
dialog = PySide6.QtWidgets.QFileDialog(None)
dialog.setWindowTitle('Select subtitle(s)')
dialog.setNameFilters(
(
'Subtitle files (*.ass *.srt *.vtt)',
'All files (*.*)'
)
)
dialog.setFileMode(
PySide6.QtWidgets.QFileDialog.FileMode.ExistingFiles
)
if dialog.exec():
__fchoose_data = dialog.selectedFiles()
else:
__fchoose_data.clear()
__fchoose_callback_event.set()
__MPV_UA = ('Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/107.0.0.0 Safari/537.36')
__escaped_chars: list[str] = ['&']
if sys.platform == 'win32':
__new_term_args = ('start', '/wait')
__sub_sep = ';'
__escape_char = '^'
else:
__new_term_args = ()
__sub_sep = ':'
__escape_char = '\\'
if sys.platform == 'cygwin':
warnings.warn('it\'s no point to run this script on '
'cygwin, pls consider using mingw instead')
elif sys.platform == 'linux':
__new_term_args = ('xterm', '-e')
elif sys.platform == 'darwin':
__new_term_args = ('open', '-W', '-a', 'Terminal.app')
else:
warnings.warn(f'unsupported platform {sys.platform!r}')
if not __new_term_args:
warnings.warn('could not launch mpv in a new term')
def __escape_single(raw: str, target: str) -> str:
assert len(target) == 1, (f'escaping is about a single '
f'char, not {target!r}')
return raw.replace(target, __escape_char + target)
def __escape_str(raw: str) -> str:
for escaped_char in __escaped_chars:
raw = __escape_single(raw, escaped_char)
return raw
__args: typing.Optional[argparse.Namespace] = None
def __start_mpv(payload: Payload, subs: list[str],
verbose: bool, conn_num: int,
inst_num: str):
sub_args = []
if subs:
sub_args.append(f'--sub-files={__sub_sep.join(subs)}')
_headers: dict[str, str] = {
'Origin': payload.origin,
'Referer': payload.referer
}
if payload.cookie is not None:
_headers['Cookie'] = format_cookie(payload.cookie)
headers = ','.join(
': '.join((k, v)) for k, v in _headers.items()
)
mpv_args = [
*__new_term_args, 'mpv',
__escape_str(payload.url),
*sub_args,
f'--title={payload.page_title}',
'--cookies',
f'--http-header-fields={headers}',
f'--user-agent={__MPV_UA}'
]
mpv_cmdline = '\'' + '\' \''.join(mpv_args) + '\''
if verbose:
print(('[+] Starting MPV for instance '
f'#{conn_num}.{inst_num} '
f'with cmdline: {mpv_cmdline}'),
file=sys.stderr)
status = subprocess.call(mpv_args, shell=True)
if verbose:
print((f'[+] Instance #{conn_num}.{inst_num} exited '
f'with code {status}'),
file=sys.stderr)
def __handle_payload(payload: Payload, conn_num: int,
inst_num: int):
__fchoose_event_lock.acquire()
__fchoose_event.set()
if __args.verbose:
print(('[+] Waiting for user to select subtitle files '
f'for instance #{conn_num}.{inst_num}'),
file=sys.stderr)
__fchoose_callback_event.wait()
__fchoose_callback_event.clear()
subs = __fchoose_data.copy()
if subs and __args.verbose:
subs_repr = ", ".join(repr(sub) for sub in subs)
print(('[+] User selected subtitles for instance '
f'#{conn_num}.{inst_num}: {subs_repr}'),
file=sys.stderr)
multiprocessing.Process(
target=__start_mpv, args=(
payload, subs, __args.verbose, conn_num, inst_num
)
).start()
__fchoose_event_lock.release()
__conn_cnt = 0
async def __handler(
ws: websockets.server.WebSocketServerProtocol,
path: str
):
global __conn_cnt
inst_cnt = 0
conn_num = __conn_cnt + 1
if __args.verbose:
__conn_cnt += 1
print(f'[+] New connection #{conn_num}',
file=sys.stderr)
if path == '/download':
try:
await _intro(ws)
while True:
payload = Payload.parse(await ws.recv())
# TODO: what does this ctrl msg do?
await ws.send(ControlMessage.NO_WAITING)
if __args.verbose:
inst_cnt += 1
print(f'[+] New instance #{conn_num}.{inst_cnt}',
file=sys.stderr)
await asyncio.to_thread(
__handle_payload, payload, conn_num, inst_cnt
)
except websockets.exceptions.ConnectionClosed:
if __args.verbose:
print(f'[+] Connection #{conn_num} closed',
file=sys.stderr)
else:
raise ValueError(f'unsupported request path {path!r}')
async def main():
async with websockets.server.serve(
__handler, NeatDMProtocol.LISTEN_ADDR,
int(NeatDMProtocol.LISTEN_PORT),
subprotocols=NeatDMProtocol.get_subprotocols()):
if __args.verbose:
print('[+] Listening for connection', file=sys.stderr)
await asyncio.Future()
if __name__ == '__main__':
parser = argparse.ArgumentParser(
os.path.basename(__file__),
description='catch NeatDM browser plugin '
'downloads and play with MPV'
)
parser.add_argument('-V', '--version',
action='version',
version=__version__)
parser.add_argument('-v', '--verbose',
action='store_true',
help='enable verbose output')
__args = parser.parse_args()
threading.Thread(target=__fchooser_thread, daemon=True).start()
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
finally:
exit(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment