Last active
October 29, 2023 19:24
-
-
Save EllieTheYeen/ba2a50a1013919c53e07ae0e962ea34b to your computer and use it in GitHub Desktop.
Some test bluetooth gamepad reading code for Linux using asyncio
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import watchdog.observers | |
import watchdog.events | |
import asyncio_redis | |
import evdev | |
import traceback | |
import asyncio | |
import signal | |
import glob | |
class TheEventHandler(watchdog.events.FileSystemEventHandler): | |
def __init__(self, loop: asyncio.AbstractEventLoop): | |
self.loop = loop | |
def on_created(self, ev): | |
# if isinstance(ev, watchdog.events.FileCreatedEvent): | |
if ev.src_path.startswith("/dev/input/event"): | |
asyncio.run_coroutine_threadsafe(connection(ev.src_path), self.loop) | |
def read(gamepad, device): | |
a = lambda x: evque.put_nowait(x) | |
for event in gamepad.read(): | |
if event.type == evdev.ecodes.SYN_REPORT: | |
continue | |
keyevent = evdev.categorize(event) | |
if isinstance(keyevent, evdev.events.InputEvent): | |
continue | |
s = keyevent | |
if event.type == evdev.ecodes.EV_KEY: | |
# button and down | |
shortest = ( | |
s.keycode | |
if isinstance(s.keycode, str) | |
else sorted(s.keycode, key=len)[0] | |
) | |
print(device, "BTTN", s.keystate, s.scancode, shortest) | |
a(f"{shortest} {s.keystate}") | |
elif event.type == evdev.ecodes.EV_ABS: | |
i = s.event | |
# Axis and value | |
print(device, "AXIS", i.code, i.value) | |
a(f"{i.code} {i.value}") | |
def saferead(gamepad, device): | |
try: | |
read(gamepad, device) | |
except Exception as e: | |
if gamepad.fileno() in readingfilenos: | |
readingfilenos.remove(gamepad.fileno()) | |
asyncio.get_event_loop().remove_reader(gamepad.fileno()) | |
print(f"{device} Gamepad Disconnected") | |
evque.put_nowait("Disconnected") | |
if isinstance(e, OSError): | |
return | |
traceback.print_exc() | |
async def connection(device): | |
for a in range(30): | |
try: | |
gamepad = evdev.InputDevice(device) | |
evque.put_nowait("Connected") | |
if gamepad.fileno() not in readingfilenos: | |
readingfilenos.append(gamepad.fileno()) | |
asyncio.get_event_loop().add_reader( | |
gamepad.fileno(), lambda: saferead(gamepad, device) | |
) | |
print(f"{device} Gamepad connected after {a} tries") | |
break | |
except OSError: | |
await asyncio.sleep(0.1) | |
else: | |
print(f"{device} Failed connecting to gamepad after several tries") | |
async def pumptask(): | |
red = await asyncio_redis.Connection.create(host="127.0.0.1") | |
while True: | |
msg = await evque.get() | |
await red.publish("gamepad", msg) | |
evque.task_done() | |
async def main(): | |
global observer, shutdown, evque | |
shutdown = asyncio.Event() | |
evque = asyncio.Queue() | |
def _signal_handler(*a) -> None: | |
if not shutdown.is_set(): | |
print("Shutdown") | |
shutdown.set() | |
loop = asyncio.get_event_loop() | |
loop.add_signal_handler(signal.SIGINT, _signal_handler) | |
loop.add_signal_handler(signal.SIGTERM, _signal_handler) | |
loop.add_signal_handler(signal.SIGABRT, _signal_handler) | |
observer = watchdog.observers.Observer() | |
observer.schedule(TheEventHandler(loop), "/dev/input", recursive=False) | |
observer.start() | |
asyncio.create_task(pumptask()) | |
print("Started") | |
for g in glob.glob("/dev/input/event*"): | |
await connection(g) | |
await shutdown.wait() | |
print("Ending") | |
observer.stop() | |
readingfilenos = [] | |
if __name__ == "__main__": | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment