Skip to content

Instantly share code, notes, and snippets.

@da-rth
Created October 25, 2021 18:09
Show Gist options
  • Save da-rth/4136279812f87819f86d99eba77c1ee0 to your computer and use it in GitHub Desktop.
Save da-rth/4136279812f87819f86d99eba77c1ee0 to your computer and use it in GitHub Desktop.
An example of how to have komorebi subscribe and send events to a named pipe in python
import win32pipe
import win32file
import pywintypes
import subprocess
import json
import time
KOMOREBI_BUFF_SIZE = 64 * 1024
KOMOREBI_PIPE_NAME = "yasb"
if __name__ == "__main__":
print("Creating named pipe:", KOMOREBI_PIPE_NAME)
pipe = win32pipe.CreateNamedPipe(
f"\\\\.\\pipe\\{KOMOREBI_PIPE_NAME}",
win32pipe.PIPE_ACCESS_DUPLEX,
win32pipe.PIPE_TYPE_MESSAGE | win32pipe.PIPE_READMODE_MESSAGE | win32pipe.PIPE_WAIT,
1,
KOMOREBI_BUFF_SIZE,
KOMOREBI_BUFF_SIZE,
0,
None
)
print("Waiting for Komorebi")
proc = subprocess.Popen(["komorebic.exe", "add-subscriber", KOMOREBI_PIPE_NAME])
proc.communicate()
while proc.returncode != 0:
print(f"Komorebic.exe failed with error code {proc.returncode}. Retrying in 5 seconds...")
time.sleep(5)
proc = subprocess.Popen(["komorebic.exe", "add-subscriber", KOMOREBI_PIPE_NAME])
proc.communicate()
try:
win32pipe.ConnectNamedPipe(pipe, None)
print("Komorebi Connected")
while True:
result, data = win32file.ReadFile(pipe, KOMOREBI_BUFF_SIZE, None)
# Filters out newlines
if not data.strip():
continue
event = json.loads(data.decode("utf-8"))
print("Received event:", event)
except pywintypes.error as e:
if e.args[0] == 109:
print("Komorebi Disconnected")
else:
print("Unknown Error", e)
finally:
win32file.CloseHandle(pipe)
@peddamat
Copy link

peddamat commented Mar 7, 2023

Hi, thanks for providing this. The api has changed slightly and 'add-subscriber' should now be 'subscribe' (in lines 25 and 31).

I've corrected it here: https://gist.github.com/peddamat/ac8f78a375d003d69669d75a012a6c46

@SF-300
Copy link

SF-300 commented May 2, 2023

In case somebody needs to read komorebi events from named win32 pipe asynchronously through asyncio, here is another gist (heavily based on this one).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment