|
import logging |
|
import os |
|
from platform import platform |
|
import random |
|
import time |
|
import sys |
|
|
|
import hid |
|
import PIL.Image |
|
import psutil |
|
import pystray |
|
import win32api |
|
import win32con |
|
import win32gui |
|
import win32ui |
|
import win32process |
|
|
|
# setup loggers |
|
# if sys.executable.endswith("pythonw.exe"): |
|
# sys.stdout = open("client_out.log", "w") |
|
# sys.stderr = open(os.path.join(os.getenv("TEMP"), "stderr-"+os.path.basename(sys.argv[0])), "w") |
|
# sys.stderr = open("client_err.log", "w") |
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
filename="client.log", |
|
format="%(asctime)s - %(levelname)s -- %(message)s", |
|
datefmt="%d/%b/%y %H:%M:%S", |
|
filemode="w" |
|
) |
|
|
|
|
|
TIMEOUT = 1000 |
|
BYTE_ORDER = 'little' |
|
MAX_MSG_LEN = 64 |
|
|
|
FLAG_SUCCESS = 0b0000_0001 # (1ul << (0)) |
|
FLAG_SECURE_FAILURE = 0b0000_0010 # (1ul << (1)) |
|
FLAG_UNLOCK_IN_PROGRESS = 0b0100_0000 # (1ul << (6)) |
|
FLAG_UNLOCKED = 0b1000_0000 # (1ul << (7)) |
|
FLAG_FAILED = 0x00 |
|
|
|
# programs in which we wanna change something |
|
MAPPING = { |
|
"chrome.exe": "Chrome", |
|
"Code.exe": "VSCode", |
|
"Discord.exe": "Discord", |
|
"explorer.exe": "Explorer", |
|
"LeagueClientUx.exe": "LoL Client", #10 |
|
"League of Legends.exe": "LoL Game", |
|
"Telegram.exe": "Telegram", |
|
"WindowsTerminal.exe": "Terminal", |
|
} |
|
DEFAULT = "ignoring" |
|
LAST = "" |
|
|
|
class XAPChange: |
|
def __repr__(self): |
|
return ", ".join(f"{atr}: {getattr(self, atr)}" for atr in self._attrs) |
|
|
|
@property |
|
def position(self): |
|
pos = b"" |
|
for atr in self._attrs: |
|
if atr in ["sub", "route", "keycode"]: |
|
continue |
|
pos += _to_bytes(getattr(self, atr)) |
|
return pos |
|
|
|
@property |
|
def payload(self): |
|
if isinstance(self.keycode, int): |
|
return self.position + _to_bytes(self.keycode, 2) |
|
return self.position + self.keycode |
|
|
|
|
|
class KeycodeChange(XAPChange): |
|
_attrs = "sub", "route", "layer", "row", "col", "keycode" |
|
|
|
sub = 0x04 |
|
route = 0x03 |
|
|
|
def __init__(self, *, layer, row, col, keycode): |
|
self.layer = layer |
|
self.row = row |
|
self.col = col |
|
self.keycode = keycode |
|
|
|
|
|
class EncoderChange(XAPChange): |
|
_attrs = "sub", "route", "layer", "index", "clockwise", "keycode" |
|
|
|
sub = 0x05 |
|
route = 0x03 |
|
|
|
def __init__(self, *, layer, index, clockwise, keycode): |
|
self.layer = layer |
|
self.index = index |
|
self.clockwise = clockwise |
|
self.keycode = keycode |
|
|
|
|
|
CHANGES = { |
|
"Chrome": [ |
|
KeycodeChange(layer=0, row=0, col=0, keycode=1), |
|
EncoderChange(layer=0, index=0, clockwise=0, keycode=2) |
|
] |
|
} |
|
|
|
|
|
def _to_bytes(integer, length=1, order=BYTE_ORDER): |
|
return integer.to_bytes(length, byteorder=order) |
|
|
|
|
|
def _from_bytes(bytes_, order=BYTE_ORDER): |
|
return int.from_bytes(bytes_, byteorder=order) |
|
|
|
|
|
def _is_xap_usage(x): |
|
return x['usage_page'] == 0xFF51 and x['usage'] == 0x0058 |
|
|
|
|
|
def _search(): |
|
devices = filter(_is_xap_usage, hid.enumerate()) |
|
return list(devices) |
|
|
|
|
|
def _xap_transaction(device, sub, route, payload=b''): |
|
# gen token |
|
token = _to_bytes(random.getrandbits(16), length=2) |
|
|
|
routing = _to_bytes(sub) + _to_bytes(route) |
|
# send with padding |
|
if isinstance(payload, int): |
|
payload = _to_bytes(payload, length=2) |
|
padding_len = MAX_MSG_LEN - 1 - len(routing) - len(payload) |
|
if padding_len < 0: |
|
raise IndexError(f"payload size too large by {-padding_len} bytes") |
|
|
|
padding = b"\x00" * padding_len |
|
buffer = token + _to_bytes(len(payload+routing)) + routing + payload + padding |
|
|
|
# prepend 0 on windows because reasons... |
|
if 'windows' in platform().lower(): |
|
buffer = b"\x00" + buffer |
|
|
|
logging.debug("Sent XAP %s", buffer) |
|
device.write(buffer) |
|
|
|
# get resp |
|
array_alpha = device.read(MAX_MSG_LEN, TIMEOUT) |
|
logging.debug("Received XAP %s", array_alpha) |
|
|
|
while token != array_alpha[:2]: |
|
# FIXME: Will consume and ignore all mismatching messages |
|
array_alpha = device.read(MAX_MSG_LEN, TIMEOUT) |
|
|
|
if array_alpha[2] != FLAG_SUCCESS: |
|
raise Exception(f"Transaction unsuccesful, got response flag {hex(array_alpha[2])}") |
|
|
|
payload_len = array_alpha[3] |
|
return array_alpha[4:4 + payload_len] |
|
|
|
|
|
def main(): |
|
# hid device |
|
devices = _search() |
|
if not devices: |
|
logging.error("No devices found!") |
|
sys.exit(1) |
|
|
|
dev = devices[0] |
|
global device |
|
device = hid.Device(path=dev['path']) |
|
logging.info("Connected to: %04X:%04X - %s - %s", dev['vendor_id'], dev['product_id'], dev['manufacturer_string'], dev['product_string']) |
|
|
|
# icon on bottom right corner |
|
def exit_action(icon): |
|
icon.visible = False |
|
icon.stop() |
|
|
|
|
|
def setup(icon): |
|
icon.visible = True |
|
changed_keycodes = [] |
|
changed = False |
|
|
|
while icon.visible: |
|
global DEFAULT, LAST, device |
|
# get current window's exe name |
|
window = win32gui.GetForegroundWindow() |
|
name = win32gui.GetWindowText(window) |
|
pid = win32process.GetWindowThreadProcessId(window) |
|
try: |
|
program = psutil.Process(pid[-1]).name() |
|
except Exception as e: |
|
logging.warning("Could not get program name from PID %s", name) |
|
|
|
# name to string mapping |
|
text = MAPPING.get(program, DEFAULT) |
|
|
|
if text != LAST: |
|
logging.info("New current window: %s (%s)", text, program) |
|
|
|
# undo changes |
|
if changed: |
|
changed = False |
|
# iterate on reversed order so if a keycode was changed more than once we take it back to its original state |
|
for change in reversed(changed_keycodes): |
|
logging.info(f"Unmaking change in ({change.position}), after {text}") |
|
_xap_transaction(device, change.sub, change.route, change.payload) |
|
changed_keycodes = []# if this program has changes, send its name and the changes over XAP |
|
|
|
# if this program has changes, apply them |
|
if text in CHANGES.keys(): |
|
# FIXME waiting for implementation to send custom data (program name) |
|
# payload = text.encode("utf-8") |
|
# logging.info("Got answer: %s", _from_bytes(_xap_transaction(device, 0x04, 0x02, payload))) |
|
|
|
changed = True |
|
for change in CHANGES[text]: |
|
previous_keycode = _xap_transaction(device, change.sub, 0x02, change.position) |
|
logging.info(f"{previous_keycode=}") |
|
if isinstance(change, KeycodeChange): |
|
previous = KeycodeChange( |
|
layer=change.layer, |
|
row=change.row, |
|
col=change.col, |
|
keycode=previous_keycode |
|
) |
|
|
|
elif isinstance(change, EncoderChange): |
|
previous = KeycodeChange( |
|
layer=change.layer, |
|
index=change.index, |
|
clockwise=change.clockwise, |
|
keycode=previous_keycode |
|
) |
|
|
|
else: |
|
raise Exception("Unknown change type") |
|
|
|
changed_keycodes.append(previous) |
|
logging.debug(f"Making change in ({change.position}), because '{text}' is focused") |
|
_xap_transaction(device, change.sub, change.route, change.payload) |
|
|
|
LAST = text |
|
|
|
# query after 5 seconds |
|
time.sleep(5) |
|
|
|
|
|
def init_icon(): |
|
icon = pystray.Icon('XAP') |
|
icon.menu = pystray.Menu( |
|
pystray.MenuItem('Exit', lambda : exit_action(icon)), |
|
) |
|
icon.icon = PIL.Image.open("logo.png") |
|
icon.title = 'XAP - Active window' |
|
|
|
icon.run(setup) |
|
|
|
init_icon() |
|
|
|
logging.info("Ending...") |
|
sys.exit(0) |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |