-
-
Save yawor/ee0f00b3d20d41d6c7f6e892ad86be25 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from concurrent.futures import Future | |
from contextlib import AbstractContextManager | |
from threading import Thread, RLock | |
from types import TracebackType | |
from typing import Optional, Type | |
from hid import Device, enumerate | |
def hex_fmt(data: bytes): | |
return " ".join("".join(c) for c in zip(*[iter(data.hex())] * 2)) | |
class Frame: | |
dev_id: int | |
feature_idx: int | |
command: int | |
app_id: int | |
data: bytes | |
def __init__(self, dev_id: int, feature_idx: int, command: int, app_id: int, data: bytes): | |
self.dev_id = dev_id & 0xff | |
self.feature_idx = feature_idx & 0xff | |
self.command = command & 0xf | |
self.app_id = app_id & 0xf | |
data = data[:16] | |
self.data = data + b'\x00' * (16 - len(data)) | |
def to_frame_data(self) -> bytes: | |
data = ( | |
bytearray( | |
[ | |
0x11, | |
self.dev_id & 0xFF, | |
self.feature_idx & 0xFF, | |
((self.command & 0xF) << 4) | (self.app_id & 0xF), | |
] | |
) | |
+ self.data[:16] | |
) | |
data += b"\x00" * (20 - len(data)) | |
return bytes(data) | |
@classmethod | |
def from_frame_data(cls, frame: bytes) -> "Frame": | |
if len(frame) < 4: | |
raise ValueError("Invalid data") | |
data = frame[4:] | |
data += b"\x00" * (16 - len(data)) | |
return Frame(frame[1], frame[2], frame[3] >> 4, frame[3] & 0xF, data) | |
def __and__(self, other: "Frame") -> bool: | |
return ( | |
self.dev_id == other.dev_id | |
and self.feature_idx == other.feature_idx | |
and self.app_id == other.app_id | |
and self.command == other.command | |
) | |
def __str__(self) -> str: | |
return f"Frame(dev_id=0x{self.dev_id:0>2x}, feature_idx=0x{self.feature_idx:0>2x}, command=0x{self.command:x}, app_id=0x{self.app_id:x}, data={hex_fmt(self.data)!r})" | |
class LogitechDevice(AbstractContextManager): | |
_req_future: Optional[Future] | |
_last_request: Optional[Frame] | |
def __init__( | |
self, | |
vid: int, | |
pid: int, | |
interface_num: int, | |
dev_id: int, | |
app_id: int = 0xB, | |
debug: bool = False, | |
): | |
self._debug = debug | |
self._running = True | |
self._app_id = app_id | |
self._dev_id = dev_id | |
path = None | |
for dev in enumerate(vid, pid): | |
if dev["interface_number"] == interface_num: | |
path = dev["path"] | |
if path is None: | |
raise RuntimeError("Device/Interface not found") | |
self._dev = Device(path=path) | |
self._dev.nonblocking = True | |
self._req_future = None | |
self._last_request = None | |
self._lock = RLock() | |
self._read_thread = Thread(target=self._read) | |
self._read_thread.start() | |
def __exit__( | |
self, | |
__exc_type: Optional[Type[BaseException]], | |
__exc_value: Optional[BaseException], | |
__traceback: Optional[TracebackType], | |
) -> Optional[bool]: | |
self.close() | |
return None | |
def _read(self): | |
while self._running: | |
res = self._dev.read(64, 500) | |
if res: | |
frame = Frame.from_frame_data(res) | |
if self._debug: | |
print("<", frame) | |
with self._lock: | |
if ( | |
self._req_future is not None | |
and self._last_request & frame | |
): | |
self._req_future.set_result(frame) | |
self._req_future = None | |
def send( | |
self, | |
feature_idx: int, | |
cmd: int, | |
data: bytes, | |
dev_id: int = None, | |
timeout: float = 1, | |
) -> Frame: | |
if dev_id is None: | |
dev_id = self._dev_id | |
frame = Frame(dev_id, feature_idx, cmd, self._app_id, data) | |
if self._debug: | |
print(">", frame) | |
fut = Future() | |
with self._lock: | |
self._last_request = frame | |
self._req_future = fut | |
self._dev.write(frame.to_frame_data()) | |
return fut.result(timeout) | |
def close(self): | |
self._running = False | |
self._read_thread.join() | |
self._dev.close() | |
if __name__ == "__main__": | |
with LogitechDevice(0x046D, 0xC541, 2, 1) as kbd: | |
count = kbd.send(0x1, 0x0, b"").data[0] | |
for i in range(count): | |
res = kbd.send(0x1, 0x1, bytearray([i])) | |
feat_id = (res.data[0] << 8) | res.data[1] | |
print(f"{i:0>2x}, {feat_id:0>4x}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment