Skip to content

Instantly share code, notes, and snippets.

@peddamat
Forked from da-rth/komorebi_named_pipe.py
Last active March 7, 2023 18:06
Show Gist options
  • Save peddamat/ac8f78a375d003d69669d75a012a6c46 to your computer and use it in GitHub Desktop.
Save peddamat/ac8f78a375d003d69669d75a012a6c46 to your computer and use it in GitHub Desktop.
An example of how to have komorebi subscribe and send events to a named pipe in python
import win32pipe
import win32file
import pywintypes
import subprocess
import json
import time
KOMOREBI_BUFF_SIZE = 64 * 1024
KOMOREBI_PIPE_NAME = "yasb"
if __name__ == "__main__":
print("Creating named pipe:", KOMOREBI_PIPE_NAME)
pipe = win32pipe.CreateNamedPipe(
f"\\\\.\\pipe\\{KOMOREBI_PIPE_NAME}",
win32pipe.PIPE_ACCESS_DUPLEX,
win32pipe.PIPE_TYPE_MESSAGE | win32pipe.PIPE_READMODE_MESSAGE | win32pipe.PIPE_WAIT,
1,
KOMOREBI_BUFF_SIZE,
KOMOREBI_BUFF_SIZE,
0,
None
)
print("Waiting for Komorebi")
proc = subprocess.Popen(["komorebic.exe", "subscribe", KOMOREBI_PIPE_NAME])
proc.communicate()
while proc.returncode != 0:
print(f"Komorebic.exe failed with error code {proc.returncode}. Retrying in 5 seconds...")
time.sleep(5)
proc = subprocess.Popen(["komorebic.exe", "subscribe", KOMOREBI_PIPE_NAME])
proc.communicate()
try:
win32pipe.ConnectNamedPipe(pipe, None)
print("Komorebi Connected")
while True:
result, data = win32file.ReadFile(pipe, KOMOREBI_BUFF_SIZE, None)
# Filters out newlines
if not data.strip():
continue
event = json.loads(data.decode("utf-8"))
print("Received event:", event)
except pywintypes.error as e:
if e.args[0] == 109:
print("Komorebi Disconnected")
else:
print("Unknown Error", e)
finally:
win32file.CloseHandle(pipe)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment