Skip to content

Instantly share code, notes, and snippets.

@yawor
Created August 31, 2021 18:55
Show Gist options
  • Save yawor/ee0f00b3d20d41d6c7f6e892ad86be25 to your computer and use it in GitHub Desktop.
Save yawor/ee0f00b3d20d41d6c7f6e892ad86be25 to your computer and use it in GitHub Desktop.
from concurrent.futures import Future
from contextlib import AbstractContextManager
from threading import Thread, RLock
from types import TracebackType
from typing import Optional, Type
from hid import Device, enumerate
def hex_fmt(data: bytes):
return " ".join("".join(c) for c in zip(*[iter(data.hex())] * 2))
class Frame:
dev_id: int
feature_idx: int
command: int
app_id: int
data: bytes
def __init__(self, dev_id: int, feature_idx: int, command: int, app_id: int, data: bytes):
self.dev_id = dev_id & 0xff
self.feature_idx = feature_idx & 0xff
self.command = command & 0xf
self.app_id = app_id & 0xf
data = data[:16]
self.data = data + b'\x00' * (16 - len(data))
def to_frame_data(self) -> bytes:
data = (
bytearray(
[
0x11,
self.dev_id & 0xFF,
self.feature_idx & 0xFF,
((self.command & 0xF) << 4) | (self.app_id & 0xF),
]
)
+ self.data[:16]
)
data += b"\x00" * (20 - len(data))
return bytes(data)
@classmethod
def from_frame_data(cls, frame: bytes) -> "Frame":
if len(frame) < 4:
raise ValueError("Invalid data")
data = frame[4:]
data += b"\x00" * (16 - len(data))
return Frame(frame[1], frame[2], frame[3] >> 4, frame[3] & 0xF, data)
def __and__(self, other: "Frame") -> bool:
return (
self.dev_id == other.dev_id
and self.feature_idx == other.feature_idx
and self.app_id == other.app_id
and self.command == other.command
)
def __str__(self) -> str:
return f"Frame(dev_id=0x{self.dev_id:0>2x}, feature_idx=0x{self.feature_idx:0>2x}, command=0x{self.command:x}, app_id=0x{self.app_id:x}, data={hex_fmt(self.data)!r})"
class LogitechDevice(AbstractContextManager):
_req_future: Optional[Future]
_last_request: Optional[Frame]
def __init__(
self,
vid: int,
pid: int,
interface_num: int,
dev_id: int,
app_id: int = 0xB,
debug: bool = False,
):
self._debug = debug
self._running = True
self._app_id = app_id
self._dev_id = dev_id
path = None
for dev in enumerate(vid, pid):
if dev["interface_number"] == interface_num:
path = dev["path"]
if path is None:
raise RuntimeError("Device/Interface not found")
self._dev = Device(path=path)
self._dev.nonblocking = True
self._req_future = None
self._last_request = None
self._lock = RLock()
self._read_thread = Thread(target=self._read)
self._read_thread.start()
def __exit__(
self,
__exc_type: Optional[Type[BaseException]],
__exc_value: Optional[BaseException],
__traceback: Optional[TracebackType],
) -> Optional[bool]:
self.close()
return None
def _read(self):
while self._running:
res = self._dev.read(64, 500)
if res:
frame = Frame.from_frame_data(res)
if self._debug:
print("<", frame)
with self._lock:
if (
self._req_future is not None
and self._last_request & frame
):
self._req_future.set_result(frame)
self._req_future = None
def send(
self,
feature_idx: int,
cmd: int,
data: bytes,
dev_id: int = None,
timeout: float = 1,
) -> Frame:
if dev_id is None:
dev_id = self._dev_id
frame = Frame(dev_id, feature_idx, cmd, self._app_id, data)
if self._debug:
print(">", frame)
fut = Future()
with self._lock:
self._last_request = frame
self._req_future = fut
self._dev.write(frame.to_frame_data())
return fut.result(timeout)
def close(self):
self._running = False
self._read_thread.join()
self._dev.close()
if __name__ == "__main__":
with LogitechDevice(0x046D, 0xC541, 2, 1) as kbd:
count = kbd.send(0x1, 0x0, b"").data[0]
for i in range(count):
res = kbd.send(0x1, 0x1, bytearray([i]))
feat_id = (res.data[0] << 8) | res.data[1]
print(f"{i:0>2x}, {feat_id:0>4x}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment