Last active
September 24, 2023 15:56
-
-
Save ookiineko/df180c3ec924f242b91c7ed4604c4ff2 to your computer and use it in GitHub Desktop.
Play with MPV but using Neat Download Manager browser plugin (and load extra subtitle files
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# SPDX-License-Identifier: GLWTPL | |
import asyncio | |
import typing | |
import enum | |
import dataclasses | |
import argparse | |
import os.path | |
import sys | |
import threading | |
import multiprocessing | |
import subprocess | |
import warnings | |
import websockets.server | |
import websockets.typing | |
import websockets.exceptions | |
import PySide6.QtWidgets | |
__version__ = '0.1.0' | |
class PayloadField(enum.StrEnum): | |
_SEP = ':' | |
_NL = '\n' | |
_SPC = ' ' | |
METHOD = '1' | |
URL = '2' | |
MODE = '6' | |
PAGE_TITLE = '4' | |
ORIGIN = 'Origin' | |
REFERER = 'Referer' | |
PAGE_URL = '5' | |
FILE_DESC = '9' | |
COOKIE = 'Cookie' | |
class Mode(enum.StrEnum): | |
NORMAL = 'normal' | |
HLS = 'hls' # HTTP Live Streaming | |
MEDIA = 'media' | |
_KWPAIR = dict[str, str] | |
_payload_field_to_kwname_map: _KWPAIR = { | |
PayloadField.METHOD: 'method', | |
PayloadField.URL: 'url', | |
PayloadField.MODE: 'mode', | |
PayloadField.PAGE_TITLE: 'page_title', | |
PayloadField.ORIGIN: 'origin', | |
PayloadField.REFERER: 'referer', | |
PayloadField.PAGE_URL: 'page_url', | |
PayloadField.FILE_DESC: 'file_desc', | |
PayloadField.COOKIE: 'cookie' | |
} | |
_payload_opt_kwname: tuple[str] = ( | |
'file_desc', 'cookie' | |
) | |
__COOKIE_SEP = ';' | |
__COOKIE_ITEM_SEP= '=' | |
__COOKIE_SPC = ' ' | |
def parse_cookie(raw: str) -> _KWPAIR: | |
res = {} | |
for idx, item in enumerate( | |
x.strip() for x in raw.split(__COOKIE_SEP) | |
): | |
if not item: | |
continue | |
if item.count(__COOKIE_ITEM_SEP) == 0: | |
raise ValueError( | |
f'no equal sign in item #{idx}' | |
) | |
k, v = ( | |
x.strip() for x in item.split( | |
__COOKIE_ITEM_SEP, 1 | |
) | |
) | |
if k in res.keys(): | |
raise ValueError(f'duplicated item {k!r}') | |
else: | |
res[k] = v | |
return res | |
def format_cookie(cookie: _KWPAIR) -> str: | |
return ( | |
__COOKIE_SEP + __COOKIE_SPC | |
).join( | |
__COOKIE_ITEM_SEP.join( | |
x | |
) for x in cookie.items() | |
) | |
@dataclasses.dataclass | |
class Payload: | |
method: str | |
url: str | |
mode: Mode | |
page_title: str | |
origin: str | |
referer: str | |
page_url: str | |
file_desc: typing.Optional[str] | |
cookie: typing.Optional[_KWPAIR] | |
@classmethod | |
def parse(cls, raw: str) -> typing.Self: | |
param: _KWPAIR = {} | |
for lno, line in enumerate( | |
x.strip() for x in raw.split( | |
PayloadField._NL | |
) | |
): | |
if not line: | |
continue | |
if line.count(PayloadField._SEP) == 0: | |
raise ValueError( | |
f'no colon at line #{lno}' | |
) | |
k, v = ( | |
x.strip() for x in line.split( | |
PayloadField._SEP, 1 | |
) | |
) | |
if ( | |
kwname := _payload_field_to_kwname_map.get(k) | |
) is None: | |
raise ValueError( | |
f'unknown field {k!r} at line #{lno}' | |
) | |
if k == PayloadField.MODE: | |
v = Mode(v) | |
elif k == PayloadField.COOKIE: | |
v = parse_cookie(v) | |
if kwname in param.keys(): | |
raise ValueError( | |
f'duplicated field {k!r} at line #{lno}' | |
) | |
else: | |
param[kwname] = v | |
for opt_kwname in _payload_opt_kwname: | |
try: | |
param[opt_kwname] | |
except KeyError: | |
param[opt_kwname] = None | |
return cls(**param) | |
def format(self) -> str: | |
__pad = '' | |
buff: list[tuple[str, tuple[str, ...]]] = [ | |
(PayloadField.METHOD, (self.method,)), | |
(PayloadField.URL, (self.url,)), | |
(PayloadField.MODE, (self.mode,)), | |
(PayloadField.PAGE_TITLE, (self.page_title,)), | |
(PayloadField.ORIGIN, (__pad, self.origin)), | |
(PayloadField.REFERER, (__pad, self.referer)), | |
(PayloadField.PAGE_URL, (self.page_url,)) | |
] | |
if self.file_desc is not None: | |
buff.append( | |
(PayloadField.FILE_DESC, (self.file_desc,)) | |
) | |
if self.cookie is not None: | |
buff.append( | |
( | |
PayloadField.COOKIE, ( | |
__pad, | |
format_cookie(self.cookie) | |
) | |
) | |
) | |
return PayloadField._NL.join( | |
( | |
PayloadField._SEP.join( | |
(k, PayloadField._SPC.join(v)) | |
) for k, v in buff | |
) | |
) | |
class NeatDMProtocol(enum.StrEnum): | |
LISTEN_ADDR = 'localhost' | |
LISTEN_PORT = '10007' | |
SUBPROTOCOL = 'neatextension.v1' | |
NDM_VERSION = 'WinVersion1424' | |
@classmethod | |
def get_subprotocols( | |
cls | |
) -> tuple[websockets.typing.Subprotocol]: | |
return ( | |
websockets.typing.Subprotocol( | |
cls.SUBPROTOCOL | |
), | |
) | |
class ControlMessage(enum.StrEnum): | |
# TODO: what do these stuffs do? | |
VERSION = NeatDMProtocol.NDM_VERSION | |
SHOW_PANEL_CHROME = 'ShowPanelChrome=1' | |
SHOW_PANEL_FOX = 'ShowPanelFox=1' | |
SHOW_PANEL_EDGE = 'ShowPanelEdge=1' | |
NO_WAITING = 'nowaiting' | |
async def _intro( | |
ws: websockets.server.WebSocketServerProtocol | |
): | |
# TODO: are these ctrl msg needed? | |
await ws.send(ControlMessage.VERSION) | |
await ws.send(ControlMessage.SHOW_PANEL_CHROME) | |
await ws.send(ControlMessage.SHOW_PANEL_FOX) | |
await ws.send(ControlMessage.SHOW_PANEL_EDGE) | |
__fchoose_event = threading.Event() | |
__fchoose_event_lock = threading.Lock() | |
__fchoose_callback_event = threading.Event() | |
__fchoose_data: list[str] = [] | |
def __fchooser_thread(): | |
global __fchoose_data | |
__app = PySide6.QtWidgets.QApplication(sys.argv) | |
while True: | |
__fchoose_event.wait() | |
__fchoose_event.clear() | |
dialog = PySide6.QtWidgets.QFileDialog(None) | |
dialog.setWindowTitle('Select subtitle(s)') | |
dialog.setNameFilters( | |
( | |
'Subtitle files (*.ass *.srt *.vtt)', | |
'All files (*.*)' | |
) | |
) | |
dialog.setFileMode( | |
PySide6.QtWidgets.QFileDialog.FileMode.ExistingFiles | |
) | |
if dialog.exec(): | |
__fchoose_data = dialog.selectedFiles() | |
else: | |
__fchoose_data.clear() | |
__fchoose_callback_event.set() | |
__MPV_UA = ('Mozilla/5.0 (Windows NT 10.0; Win64; x64) ' | |
'AppleWebKit/537.36 (KHTML, like Gecko) ' | |
'Chrome/107.0.0.0 Safari/537.36') | |
__escaped_chars: list[str] = ['&'] | |
if sys.platform == 'win32': | |
__new_term_args = ('start', '/wait') | |
__sub_sep = ';' | |
__escape_char = '^' | |
else: | |
__new_term_args = () | |
__sub_sep = ':' | |
__escape_char = '\\' | |
if sys.platform == 'cygwin': | |
warnings.warn('it\'s no point to run this script on ' | |
'cygwin, pls consider using mingw instead') | |
elif sys.platform == 'linux': | |
__new_term_args = ('xterm', '-e') | |
elif sys.platform == 'darwin': | |
__new_term_args = ('open', '-W', '-a', 'Terminal.app') | |
else: | |
warnings.warn(f'unsupported platform {sys.platform!r}') | |
if not __new_term_args: | |
warnings.warn('could not launch mpv in a new term') | |
def __escape_single(raw: str, target: str) -> str: | |
assert len(target) == 1, (f'escaping is about a single ' | |
f'char, not {target!r}') | |
return raw.replace(target, __escape_char + target) | |
def __escape_str(raw: str) -> str: | |
for escaped_char in __escaped_chars: | |
raw = __escape_single(raw, escaped_char) | |
return raw | |
__args: typing.Optional[argparse.Namespace] = None | |
def __start_mpv(payload: Payload, subs: list[str], | |
verbose: bool, conn_num: int, | |
inst_num: str): | |
sub_args = [] | |
if subs: | |
sub_args.append(f'--sub-files={__sub_sep.join(subs)}') | |
_headers: dict[str, str] = { | |
'Origin': payload.origin, | |
'Referer': payload.referer | |
} | |
if payload.cookie is not None: | |
_headers['Cookie'] = format_cookie(payload.cookie) | |
headers = ','.join( | |
': '.join((k, v)) for k, v in _headers.items() | |
) | |
mpv_args = [ | |
*__new_term_args, 'mpv', | |
__escape_str(payload.url), | |
*sub_args, | |
f'--title={payload.page_title}', | |
'--cookies', | |
f'--http-header-fields={headers}', | |
f'--user-agent={__MPV_UA}' | |
] | |
mpv_cmdline = '\'' + '\' \''.join(mpv_args) + '\'' | |
if verbose: | |
print(('[+] Starting MPV for instance ' | |
f'#{conn_num}.{inst_num} ' | |
f'with cmdline: {mpv_cmdline}'), | |
file=sys.stderr) | |
status = subprocess.call(mpv_args, shell=True) | |
if verbose: | |
print((f'[+] Instance #{conn_num}.{inst_num} exited ' | |
f'with code {status}'), | |
file=sys.stderr) | |
def __handle_payload(payload: Payload, conn_num: int, | |
inst_num: int): | |
__fchoose_event_lock.acquire() | |
__fchoose_event.set() | |
if __args.verbose: | |
print(('[+] Waiting for user to select subtitle files ' | |
f'for instance #{conn_num}.{inst_num}'), | |
file=sys.stderr) | |
__fchoose_callback_event.wait() | |
__fchoose_callback_event.clear() | |
subs = __fchoose_data.copy() | |
if subs and __args.verbose: | |
subs_repr = ", ".join(repr(sub) for sub in subs) | |
print(('[+] User selected subtitles for instance ' | |
f'#{conn_num}.{inst_num}: {subs_repr}'), | |
file=sys.stderr) | |
multiprocessing.Process( | |
target=__start_mpv, args=( | |
payload, subs, __args.verbose, conn_num, inst_num | |
) | |
).start() | |
__fchoose_event_lock.release() | |
__conn_cnt = 0 | |
async def __handler( | |
ws: websockets.server.WebSocketServerProtocol, | |
path: str | |
): | |
global __conn_cnt | |
inst_cnt = 0 | |
conn_num = __conn_cnt + 1 | |
if __args.verbose: | |
__conn_cnt += 1 | |
print(f'[+] New connection #{conn_num}', | |
file=sys.stderr) | |
if path == '/download': | |
try: | |
await _intro(ws) | |
while True: | |
payload = Payload.parse(await ws.recv()) | |
# TODO: what does this ctrl msg do? | |
await ws.send(ControlMessage.NO_WAITING) | |
if __args.verbose: | |
inst_cnt += 1 | |
print(f'[+] New instance #{conn_num}.{inst_cnt}', | |
file=sys.stderr) | |
await asyncio.to_thread( | |
__handle_payload, payload, conn_num, inst_cnt | |
) | |
except websockets.exceptions.ConnectionClosed: | |
if __args.verbose: | |
print(f'[+] Connection #{conn_num} closed', | |
file=sys.stderr) | |
else: | |
raise ValueError(f'unsupported request path {path!r}') | |
async def main(): | |
async with websockets.server.serve( | |
__handler, NeatDMProtocol.LISTEN_ADDR, | |
int(NeatDMProtocol.LISTEN_PORT), | |
subprotocols=NeatDMProtocol.get_subprotocols()): | |
if __args.verbose: | |
print('[+] Listening for connection', file=sys.stderr) | |
await asyncio.Future() | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser( | |
os.path.basename(__file__), | |
description='catch NeatDM browser plugin ' | |
'downloads and play with MPV' | |
) | |
parser.add_argument('-V', '--version', | |
action='version', | |
version=__version__) | |
parser.add_argument('-v', '--verbose', | |
action='store_true', | |
help='enable verbose output') | |
__args = parser.parse_args() | |
threading.Thread(target=__fchooser_thread, daemon=True).start() | |
try: | |
asyncio.run(main()) | |
except KeyboardInterrupt: | |
pass | |
finally: | |
exit(0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment