Skip to content

Instantly share code, notes, and snippets.

@justfoolingaround
Created May 14, 2022 12:01
Show Gist options
  • Save justfoolingaround/76e221a9a6bee89216bacb184b67726f to your computer and use it in GitHub Desktop.
Save justfoolingaround/76e221a9a6bee89216bacb184b67726f to your computer and use it in GitHub Desktop.
Windows MPV IPC Connection Client
import contextlib
import random
import sys
import threading
import time
from multiprocessing.connection import PipeConnection
import _winapi
import ujson
class VincibleThread(threading.Thread):
kill_state = threading.Event()
def run(self):
sys.settrace(self.global_trace)
return super().run()
def global_trace(self, stack_frame, reason, *args, **kwargs):
if reason == "call":
return self.local_trace
def local_trace(self, stack_frame, reason, *args, **kwargs):
if self.kill_state.is_set() and reason == "line":
raise SystemExit()
return self.local_trace
def kill(self):
return self.kill_state.set()
class WindowsMPVSocket(object):
"""
mpv JSON-IPC socket with recieving hooks and communication support.
:param ipc_path: path to mpv's IPC socket.
"""
def __init__(self, ipc_path):
self.ipc_path = ipc_path
self.requests = {}
try:
pipe_handle = _winapi.CreateFile(
self.ipc_path,
_winapi.GENERIC_READ | _winapi.GENERIC_WRITE,
_winapi.NULL,
_winapi.NULL,
_winapi.OPEN_EXISTING,
_winapi.FILE_FLAG_OVERLAPPED,
_winapi.NULL,
)
except FileNotFoundError:
raise FileNotFoundError(
"Failed to create handler for the IPC socket. "
"Make sure mpv's IPC socket is accessible."
)
self.socket = PipeConnection(pipe_handle)
self.recieving_threads: "list[VincibleThread]" = []
def send(self, command):
try:
self.socket.send_bytes(ujson.dumps(command).encode("utf-8") + b"\n")
except OSError:
raise BrokenPipeError(*OSError.args)
def communicate(self, command, *, timeout=5.0):
if "request_id" not in command:
request_id = random.randrange(0, 2**32)
command.update({"request_id": request_id})
else:
request_id = command["request_id"]
self.send(command)
until = time.time() + timeout
reciever = None
if all(thread.kill_state.is_set() for thread in self.recieving_threads):
reciever = self.on_receive(lambda _: None)
while time.time() < until and request_id not in self.requests:
pass
if reciever is not None:
reciever.kill()
return self.requests.pop(request_id, {})
def recv(self):
try:
retval = b"".join(iter(lambda: self.socket.recv_bytes(1), b"\n")).strip()
except EOFError:
raise BrokenPipeError(
"Cannot read bytes from socket, possibly due to process unavailability."
)
if retval:
retval = ujson.loads(retval.decode("utf-8"))
else:
retval = {}
if "request_id" in retval:
self.requests[retval["request_id"]] = retval
return retval
def __iter__(self):
yield from iter(self.recv, {})
def on_receive(self, callback):
def reciever_hook():
main_thread = threading.main_thread()
current_thread: VincibleThread = threading.current_thread()
for data in self:
if not main_thread.is_alive():
current_thread.kill()
callback(data)
thread = VincibleThread(target=reciever_hook)
thread.start()
self.recieving_threads.append(thread)
return thread
def close(self):
for thread in self.recieving_threads:
if not thread.kill_state.is_set():
thread.kill()
with contextlib.suppress():
self.socket.close()
if __name__ == "__main__":
import shlex
socket = WindowsMPVSocket("\\\\.\\pipe\\mpvsocket")
socket.on_receive(print)
while 1:
print(
"[*] {}".format(socket.communicate({"command": shlex.split(input("> "))}))
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment