Skip to content

Instantly share code, notes, and snippets.

@EllieTheYeen
Last active October 29, 2023 19:24
Show Gist options
  • Save EllieTheYeen/ba2a50a1013919c53e07ae0e962ea34b to your computer and use it in GitHub Desktop.
Save EllieTheYeen/ba2a50a1013919c53e07ae0e962ea34b to your computer and use it in GitHub Desktop.
Some test bluetooth gamepad reading code for Linux using asyncio
import watchdog.observers
import watchdog.events
import asyncio_redis
import evdev
import traceback
import asyncio
import signal
import glob
class TheEventHandler(watchdog.events.FileSystemEventHandler):
def __init__(self, loop: asyncio.AbstractEventLoop):
self.loop = loop
def on_created(self, ev):
# if isinstance(ev, watchdog.events.FileCreatedEvent):
if ev.src_path.startswith("/dev/input/event"):
asyncio.run_coroutine_threadsafe(connection(ev.src_path), self.loop)
def read(gamepad, device):
a = lambda x: evque.put_nowait(x)
for event in gamepad.read():
if event.type == evdev.ecodes.SYN_REPORT:
continue
keyevent = evdev.categorize(event)
if isinstance(keyevent, evdev.events.InputEvent):
continue
s = keyevent
if event.type == evdev.ecodes.EV_KEY:
# button and down
shortest = (
s.keycode
if isinstance(s.keycode, str)
else sorted(s.keycode, key=len)[0]
)
print(device, "BTTN", s.keystate, s.scancode, shortest)
a(f"{shortest} {s.keystate}")
elif event.type == evdev.ecodes.EV_ABS:
i = s.event
# Axis and value
print(device, "AXIS", i.code, i.value)
a(f"{i.code} {i.value}")
def saferead(gamepad, device):
try:
read(gamepad, device)
except Exception as e:
if gamepad.fileno() in readingfilenos:
readingfilenos.remove(gamepad.fileno())
asyncio.get_event_loop().remove_reader(gamepad.fileno())
print(f"{device} Gamepad Disconnected")
evque.put_nowait("Disconnected")
if isinstance(e, OSError):
return
traceback.print_exc()
async def connection(device):
for a in range(30):
try:
gamepad = evdev.InputDevice(device)
evque.put_nowait("Connected")
if gamepad.fileno() not in readingfilenos:
readingfilenos.append(gamepad.fileno())
asyncio.get_event_loop().add_reader(
gamepad.fileno(), lambda: saferead(gamepad, device)
)
print(f"{device} Gamepad connected after {a} tries")
break
except OSError:
await asyncio.sleep(0.1)
else:
print(f"{device} Failed connecting to gamepad after several tries")
async def pumptask():
red = await asyncio_redis.Connection.create(host="127.0.0.1")
while True:
msg = await evque.get()
await red.publish("gamepad", msg)
evque.task_done()
async def main():
global observer, shutdown, evque
shutdown = asyncio.Event()
evque = asyncio.Queue()
def _signal_handler(*a) -> None:
if not shutdown.is_set():
print("Shutdown")
shutdown.set()
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, _signal_handler)
loop.add_signal_handler(signal.SIGTERM, _signal_handler)
loop.add_signal_handler(signal.SIGABRT, _signal_handler)
observer = watchdog.observers.Observer()
observer.schedule(TheEventHandler(loop), "/dev/input", recursive=False)
observer.start()
asyncio.create_task(pumptask())
print("Started")
for g in glob.glob("/dev/input/event*"):
await connection(g)
await shutdown.wait()
print("Ending")
observer.stop()
readingfilenos = []
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment