Skip to content

Instantly share code, notes, and snippets.

@SF-300
Last active November 17, 2023 21:12
Show Gist options
  • Save SF-300/b5a279b28b94ef9add846de33c036cd7 to your computer and use it in GitHub Desktop.
Save SF-300/b5a279b28b94ef9add846de33c036cd7 to your computer and use it in GitHub Desktop.
async reading of win32 named pipe containing komorebi window manager events
# NOTE(zeronineseven): Heavily based on https://gist.github.com/denBot/4136279812f87819f86d99eba77c1ee0
import asyncio
import contextlib
import json
from contextlib import AsyncExitStack
from pathlib import Path
from typing import AsyncContextManager, Protocol, AsyncIterator, Never
import win32event
import win32pipe
import win32file
import pywintypes
class _PyHandle(Protocol):
handle: int
def Close(self):
...
class _PyHandleWrapper:
def __init__(self, handle: _PyHandle) -> None:
self.handle = handle
def fileno(self) -> int:
return self.handle.handle
def close(self) -> None:
self.handle.Close()
@contextlib.asynccontextmanager
async def _events_stream_alive(
pipe_name: str = "pykomorebi",
komorebic_executable: Path = Path(r"C:\Program Files\komorebi\bin\komorebic.exe"),
) -> AsyncContextManager[AsyncIterator[dict]]:
bufsize = 65536
async with AsyncExitStack() as deffer:
pipe = win32pipe.CreateNamedPipe(*dict(
lpName=rf"\\.\pipe\{pipe_name}",
dwOpenMode=win32pipe.PIPE_ACCESS_INBOUND | win32file.FILE_FLAG_OVERLAPPED,
dwPipeMode=win32pipe.PIPE_TYPE_MESSAGE | win32pipe.PIPE_READMODE_MESSAGE,
nMaxInstances=1,
nOutBufferSize=bufsize,
nInBufferSize=bufsize,
nDefaultTimeOut=0,
lpSecurityAttributes=None,
).values())
registration = await asyncio.create_subprocess_exec(komorebic_executable, "subscribe", pipe_name)
if (await registration.wait()) != 0:
raise RuntimeError("Failed to register events pipe in komorebi!")
overlapped = pywintypes.OVERLAPPED()
overlapped.hEvent = win32event.CreateEvent(None, 0, 0, None)
win32pipe.ConnectNamedPipe(*dict(
hNamedPipe=pipe,
lpOverlapped=overlapped,
).values())
deffer.callback(win32file.CloseHandle, pipe)
reader = asyncio.StreamReader()
transport, protocol = await asyncio.get_running_loop().connect_read_pipe(
lambda: asyncio.StreamReaderProtocol(reader), _PyHandleWrapper(pipe)
)
deffer.callback(transport.close)
async def events():
while True:
line = await reader.readline()
if not line:
assert protocol.eof_received() and transport.is_closing()
raise IOError("Connection closed")
event = json.loads(line.decode("utf-8"))
yield event
yield events()
async def _main() -> Never:
async with _events_stream_alive() as events:
async for event in events:
print(event)
asyncio.run(_main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment