Skip to content

Instantly share code, notes, and snippets.

@Langerz82
Last active January 22, 2024 05:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Langerz82/da02cc12e49c37b5cfe07b3a06b5c15e to your computer and use it in GitHub Desktop.
Save Langerz82/da02cc12e49c37b5cfe07b3a06b5c15e to your computer and use it in GitHub Desktop.
EE - 4.7 - emuelec-bluetooth - test version
#!/usr/bin/python -u
################################################################################
# SPDX-License-Identifier: GPL-2.0-or-later
# Copyright (C) 2023-present ebeem (https://github.com/ebeem)
# Modifications made by:
# Langerz82 (https://github.com/Langerz82)
# wang80919 (https://github.com/wang80919)
# Testing done by:
# junm6802030 (https://github.com/junm6802030)
# Special thanks to everyone who worked on this.
################################################################################
from subprocess import Popen, PIPE, STDOUT
from dataclasses import dataclass
import time
import sys
import os
from multiprocessing import Process
DEBUG = False
@dataclass
class BluetoothDevice:
mac: str
name: str
alias: str
clas: str
icon: str
paired: bool
bounded: bool
trusted: bool
blocked: bool
connected: bool
wake_allowed: bool
legacy_pairing: bool
rssi: int
class ShellIO:
@staticmethod
def execute(cmd: list[str], debug=DEBUG) -> list[str]:
if debug:
print("> " + " ".join(x for x in cmd))
out: list[str] = []
with Popen(cmd, stdout=PIPE, bufsize=1, universal_newlines=True) as p:
for line in p.stdout:
out.append(line)
if debug:
print(line)
return out
@staticmethod
def execute_lookup(cmd: list[str], match: str, debug=DEBUG) -> str:
out = ShellIO.execute(cmd, debug=debug)
for o in out:
if match in o:
return o
return ""
@staticmethod
def execute_async(cmd: list[str]):
Popen(cmd, stdout=PIPE, bufsize=1, universal_newlines=True)
class BluetoothCTL:
async_bluetooth_process: "Popen | None"
def __init__(self):
self.async_bluetooth_process = None
def _parse_device_info(self, mac: str):
info = ShellIO.execute(["bluetoothctl", "info", mac], debug=False)
dev = BluetoothDevice(
mac=mac,
name="",
alias="",
clas="",
icon="",
paired=False,
bounded=False,
trusted=False,
blocked=False,
connected=False,
wake_allowed=False,
legacy_pairing=False,
rssi=0,
)
for i in info:
i = i.strip()
if i.startswith("Name:"):
dev.name = i.split(":")[1].strip()
if i.startswith("Alias:"):
dev.alias = i.split(":")[1].strip()
if i.startswith("Class:"):
dev.clas = i.split(":")[1].strip()
if i.startswith("Icon:"):
dev.icon = i.split(":")[1].strip()
if i.startswith("Paired:"):
dev.paired = i.split(":")[1].strip() == "yes"
if i.startswith("Bounded:"):
dev.bounded = i.split(":")[1].strip() == "yes"
if i.startswith("Trusted:"):
dev.trusted = i.split(":")[1].strip() == "yes"
if i.startswith("Blocked:"):
dev.blocked = i.split(":")[1].strip() == "yes"
if i.startswith("Connected:"):
dev.connected = i.split(":")[1].strip() == "yes"
if i.startswith("WakeAllowed:"):
dev.wake_allowed = i.split(":")[1].strip() == "yes"
if i.startswith("LegacyPairing:"):
dev.legacy_pairing = i.split(":")[1].strip() == "yes"
if i.startswith("RSSI:"):
# Following code tested on bluez 5.66, and should work with 5.72 too.
dev.rssi = int(i.split(" ")[-1].replace("(","").replace(")","").strip())
return dev
@property
def power(self) -> bool:
return "yes" in ShellIO.execute_lookup(["bluetoothctl", "show"], "Pairable", debug=False)
@property
def discoverable(self) -> bool:
return "yes" in ShellIO.execute_lookup(["bluetoothctl", "show"], "Discoverable", debug=False)
@property
def pairable(self) -> bool:
return "yes" in ShellIO.execute_lookup(["bluetoothctl", "show"], "Pairable", debug=False)
@power.setter
def power(self, value):
ShellIO.execute(["bluetoothctl", "power", "on" if value else "off"])
@discoverable.setter
def discoverable(self, value):
ShellIO.execute(["bluetoothctl", "discoverable", "on" if value else "off"])
@pairable.setter
def pairable(self, value):
ShellIO.execute(["bluetoothctl", "pairable", "on" if value else "off"])
@pairable.setter
def agent(self, value):
ShellIO.execute(["bluetoothctl", "agent", "on" if value else "off"])
def scan(self, timeout=10):
ShellIO.execute(["bluetoothctl", "--timeout", str(timeout), "scan", "on"])
def scan_async(self, timeout=10):
# using stdbuf to flush buffer and get output immediately (bluetoothctl issue)
cmd = (
"stdbuf -oL bluetoothctl --timeout " + str(timeout) + " scan on"
if timeout > 0
else "stdbuf -oL bluetoothctl -- scan on"
)
process = Popen(cmd.split(), stdout=PIPE, universal_newlines=True)
while True:
line = process.stdout.readline()
if line == "" and process.poll() is not None:
break
if line:
line = line.strip().rstrip()
if DEBUG:
print("< ", line)
if (
len(line.split()) >= 3
and len(line.split()[2]) == 17
and (
"NEW" in line.split()[0]
or "CHG" in line.split()[0]
or "DEL" in line.split()[0]
)
):
if DEBUG:
print(bt.devices)
devs = [x for x in bt.devices if x.mac == line.split()[2]]
if len(devs) > 0:
if "DEL" in line.split()[0]:
on_device_removed(devs[0])
elif "NEW" in line.split()[0] or (
"CHG" in line.split()[0] and "RSSI:" in line
):
if DEBUG:
print(devs)
if not on_device_discovered(devs[0]):
print("stopping thread")
break
def scan_stop(self):
ShellIO.execute_async(["pkill", "-f", "bluetoothctl"])
@property
def scanning(self) -> bool:
return "yes" in ShellIO.execute_lookup(["bluetoothctl", "show"], "Discovering", debug=False)
@property
def controllers(self) -> list[str]:
controllers = ShellIO.execute(["bluetoothctl", "list"], debug=False)
return [ctrl.split()[1] for ctrl in controllers]
@property
def devices(self) -> list[BluetoothDevice]:
devs = ShellIO.execute(["bluetoothctl", "devices"], debug=False)
return [self._parse_device_info(dev.split()[1]) for dev in devs]
def trust(self, dev: BluetoothDevice) -> bool:
return "succeeded" in ShellIO.execute_lookup(
["bluetoothctl", "trust", dev.mac], "trust succeeded"
)
def untrust(self, dev: BluetoothDevice) -> bool:
return "succeeded" in ShellIO.execute_lookup(
["bluetoothctl", "untrust", dev.mac], "untrust succeeded"
)
def pair(self, dev: BluetoothDevice) -> bool:
return "successful" in ShellIO.execute_lookup(
["bluetoothctl", "pair", dev.mac], "Pairing successful"
)
def disconnect(self, dev: BluetoothDevice) -> bool:
return "successful" in ShellIO.execute_lookup(
["bluetoothctl", "disconnect", dev.mac], "Disconnection successful"
)
def connect(self, dev: BluetoothDevice) -> bool:
return "successful" in ShellIO.execute_lookup(
["bluetoothctl", "connect", dev.mac], "Connection successful"
)
def forget(self, dev: BluetoothDevice) -> bool:
return "removed" in ShellIO.execute_lookup(
["bluetoothctl", "remove", dev.mac], "Device has been removed"
)
# returns true if a game is running via the emuelecRunEmu script
def is_in_game():
return ShellIO.execute_lookup(["ps", "-ef"], "emuelecRunEmu.sh", debug=False)
# returns true if another process (not current) is running infinite scan
def is_bluetooth_running():
out = ShellIO.execute(["ps", "-ef"], debug=False)
for o in out:
if "emuelec-bluetooth -1" in o:
return False if str(os.getpid()) in o else True
return False
def on_device_discovered(dev: BluetoothDevice) -> bool:
if DEBUG:
print(dev)
if dev.blocked:
return True
if (dev.trusted and dev.paired and not dev.connected and bt.connect(dev)):
return True
if (
not dev.connected # exclude already connected devices
and dev.rssi != 0 # exclude inactive devices (saved)
and "input-" in dev.icon # exclude any non-input device
):
print(
"found device {}, mac: {}, icon: {}, paired: {}".format(
dev.name, dev.mac, dev.icon, dev.paired
)
)
if bt.trust(dev) and bt.pair(dev) and bt.connect(dev):
print("connected device {}".format(dev.name))
return False
else:
bt.forget(dev)
print("forgot device {}".format(dev.name))
elif (
dev.connected and not dev.trusted
# and dev.rssi == 0
# and "input-" in dev.icon
):
print(
"trusting and pairing device {}, mac: {}, icon: {}, paired: {}".format(
dev.name, dev.mac, dev.icon, dev.paired
)
)
if bt.trust(dev):
bt.pair(dev)
print("successfully trusted {} now connect cable.".format(dev.name))
return True
def on_device_removed(dev: BluetoothDevice):
pass
SCAN_TIME = 60
# ELAPSED_TIME = 0
UPDATE_INTERVAL = 1
BEEN_IN_GAME = False
BT_THREAD: Process = None
if __name__ == "__main__":
if len(sys.argv) >= 2:
SCAN_TIME = int(sys.argv[1])
print("running scan for " + str(SCAN_TIME) + " seconds.")
bt = BluetoothCTL()
if is_bluetooth_running():
print("bluetooth is already running, exiting.")
exit()
if len(bt.controllers) == 0:
print("no controller found, exiting.")
exit()
bt.power = True
bt.agent = True
bt.discoverable = True
bt.pairable = True
start_time = time.perf_counter()
while True:
diff_time = time.perf_counter() - start_time
# if SCAN_TIME >= 0:
# if ELAPSED_TIME > SCAN_TIME:
# print(diff_time)
if SCAN_TIME != -1 and diff_time > SCAN_TIME:
exit()
if not BEEN_IN_GAME:
BEEN_IN_GAME = is_in_game()
connected_devs = [dev for dev in bt.devices if dev.connected]
if not bt.scanning and (
not BEEN_IN_GAME or len(connected_devs) == 0 or SCAN_TIME > 0
):
print("resuming scan")
BT_THREAD = Process(target=bt.scan_async, args=(SCAN_TIME,))
BT_THREAD.start()
elif (
bt.scanning and BEEN_IN_GAME and (len(connected_devs) > 0 or SCAN_TIME < 0)
):
print("stopping scan")
if BT_THREAD is not None:
BT_THREAD.terminate()
bt.scan_stop()
time.sleep(UPDATE_INTERVAL)
# ELAPSED_TIME += UPDATE_INTERVAL
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment