Skip to content

Instantly share code, notes, and snippets.

@Langerz82
Last active July 10, 2023 01:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Langerz82/5cda9c64fd6fc71127a99f84313152ba to your computer and use it in GitHub Desktop.
Save Langerz82/5cda9c64fd6fc71127a99f84313152ba to your computer and use it in GitHub Desktop.
EE - emuelec-bluetooth v2
#!/usr/bin/env python
from subprocess import Popen, PIPE, STDOUT
from dataclasses import dataclass
import time
import sys
import os
import re
from threading import Thread
from multiprocessing import Process
DEBUG = False
@dataclass
class BluetoothDevice:
mac: str
name: str
alias: str
clas: str
icon: str
paired: bool
bounded: bool
trusted: bool
blocked: bool
connected: bool
wake_allowed: bool
legacy_pairing: bool
rssi: int
class ShellIO:
@staticmethod
def execute(cmd: list[str], debug=DEBUG) -> list[str]:
if debug:
print("> " + " ".join(x for x in cmd))
out: list[str] = []
with Popen(cmd, stdout=PIPE, bufsize=1, universal_newlines=True) as p:
for line in p.stdout:
out.append(line)
if debug:
print(line)
return out
@staticmethod
def execute_lookup(cmd: list[str], match: str, debug=DEBUG) -> str:
out = ShellIO.execute(cmd, debug=debug)
for o in out:
if match in o:
return o
return ""
@staticmethod
def execute_async(cmd: list[str]):
Popen(cmd, stdout=PIPE, bufsize=1, universal_newlines=True)
class BluetoothCTL:
async_bluetooth_process: Popen | None
def __init__(self):
self.async_bluetooth_process = None
def _parse_device_info(self, mac: str):
info = ShellIO.execute(["bluetoothctl", "info", mac], debug=False)
dev = BluetoothDevice(
mac=mac,
name="",
alias="",
clas="",
icon="",
paired=False,
bounded=False,
trusted=False,
blocked=False,
connected=False,
wake_allowed=False,
legacy_pairing=False,
rssi=0,
)
for i in info:
i = i.strip()
if i.startswith("Name:"):
dev.name = i.split(":")[1].strip()
if i.startswith("Alias:"):
dev.alias = i.split(":")[1].strip()
if i.startswith("Class:"):
dev.clas = i.split(":")[1].strip()
if i.startswith("Icon:"):
dev.icon = i.split(":")[1].strip()
if i.startswith("Paired:"):
dev.paired = i.split(":")[1].strip() == "yes"
if i.startswith("Bounded:"):
dev.bounded = i.split(":")[1].strip() == "yes"
if i.startswith("Trusted:"):
dev.trusted = i.split(":")[1].strip() == "yes"
if i.startswith("Blocked:"):
dev.blocked = i.split(":")[1].strip() == "yes"
if i.startswith("Connected:"):
dev.connected = i.split(":")[1].strip() == "yes"
if i.startswith("WakeAllowed:"):
dev.wake_allowed = i.split(":")[1].strip() == "yes"
if i.startswith("LegacyPairing:"):
dev.legacy_pairing = i.split(":")[1].strip() == "yes"
if i.startswith("RSSI:"):
dev.rssi = int(i.split(":")[1].strip())
return dev
@property
def power(self) -> bool:
return "yes" in ShellIO.execute_lookup(["bluetoothctl", "show"], "Pairable")
@property
def discoverable(self) -> bool:
return "yes" in ShellIO.execute_lookup(["bluetoothctl", "show"], "Discoverable")
@property
def pairable(self) -> bool:
return "yes" in ShellIO.execute_lookup(["bluetoothctl", "show"], "Pairable")
@power.setter
def power(self, value):
ShellIO.execute(["bluetoothctl", "power", "on" if value else "off"])
@discoverable.setter
def discoverable(self, value):
ShellIO.execute(["bluetoothctl", "discoverable", "on" if value else "off"])
@pairable.setter
def pairable(self, value):
ShellIO.execute(["bluetoothctl", "pairable", "on" if value else "off"])
@pairable.setter
def agent(self, value):
ShellIO.execute(["bluetoothctl", "agent", "on" if value else "off"])
def scan(self, timeout=10):
ShellIO.execute(["bluetoothctl", "--timeout", str(timeout), "scan", "on"])
def scan_async(self, timeout=10):
# using stdbuf to flush buffer and get output immediately (bluetoothctl issue)
cmd = (
"stdbuf -oL bluetoothctl --timeout " + str(timeout) + " scan on"
if timeout > 0
else "stdbuf -oL bluetoothctl -- scan on"
)
process = Popen(cmd.split(), stdout=PIPE, universal_newlines=True)
while True:
line = process.stdout.readline()
if line == "" and process.poll() is not None:
break
if line:
line = line.strip().rstrip()
if DEBUG:
print("< ", line)
if (
len(line.split()) >= 3
and len(line.split()[2]) == 17
and (
"NEW" in line.split()[0]
or "CHG" in line.split()[0]
or "DEL" in line.split()[0]
)
):
devs = [x for x in bt.devices() if x.mac == line.split()[2]]
if len(devs) > 0:
if "DEL" in line.split()[0]:
on_device_removed(devs[0])
elif "NEW" in line.split()[0] or (
"CHG" in line.split()[0] and "RSSI:" in line
):
if not on_device_discovered(devs[0]):
print("stopping thread")
break
def scan_stop(self):
ShellIO.execute_async(["pkill", "-f", "bluetoothctl"])
@property
def scanning(self) -> bool:
return "yes" in ShellIO.execute_lookup(["bluetoothctl", "show"], "Discovering")
def devices(self) -> list[BluetoothDevice]:
devs = ShellIO.execute(["bluetoothctl", "devices"], debug=False)
return [self._parse_device_info(dev.split()[1]) for dev in devs]
def trust(self, dev: BluetoothDevice) -> bool:
return "succeeded" in ShellIO.execute_lookup(
["bluetoothctl", "trust", dev.mac], "trust succeeded"
)
def untrust(self, dev: BluetoothDevice) -> bool:
return "succeeded" in ShellIO.execute_lookup(
["bluetoothctl", "untrust", dev.mac], "untrust succeeded"
)
def pair(self, dev: BluetoothDevice) -> bool:
return "successful" in ShellIO.execute_lookup(
["bluetoothctl", "pair", dev.mac], "Pairing successful"
)
def disconnect(self, dev: BluetoothDevice) -> bool:
return "successful" in ShellIO.execute_lookup(
["bluetoothctl", "disconnect", dev.mac], "Disconnection successful"
)
def connect(self, dev: BluetoothDevice) -> bool:
return "successful" in ShellIO.execute_lookup(
["bluetoothctl", "connect", dev.mac], "Connection successful"
)
def forget(self, dev: BluetoothDevice) -> bool:
return "removed" in ShellIO.execute_lookup(
["bluetoothctl", "remove", dev.mac], "Device has been removed"
)
# returns true if a game is running via the emuelecRunEmu script
def is_in_game():
return ShellIO.execute_lookup(["ps", "-ef"], "emuelecRunEmu.sh", debug=False)
# returns true if another process (not current) is running infinite scan
def is_bluetooth_running():
out = ShellIO.execute(["ps", "-ef"], debug=False)
for o in out:
if "emuelec-bluetooth -1" in o:
return False if str(os.getpid()) in o else True
return False
def on_device_discovered(dev: BluetoothDevice) -> bool:
if dev.blocked:
return True
if (
not dev.connected # exclude already connected devices
and dev.rssi != 0 # exclude inactive devices (saved)
and "input-" in dev.icon # exclude any non-input device
):
print(
"found device {}, mac: {}, icon: {}, paired: {}".format(
dev.name, dev.mac, dev.icon, dev.paired
)
)
if bt.trust(dev) and bt.pair(dev) and bt.connect(dev):
print("connected device {}".format(dev.name))
return False
else:
bt.forget(dev)
print("forgot device {}".format(dev.name))
elif (
dev.connected and not dev.trusted
and "input-" in dev.icon
):
print(
"trusting and pairing device {}, mac: {}, icon: {}, paired: {}".format(
dev.name, dev.mac, dev.icon, dev.paired
)
)
if bt.trust(dev):
bt.pair(dev)
print("successfully trusted {} now connect cable.".format(dev.name))
return True
def on_device_removed(dev: BluetoothDevice):
pass
SCAN_TIME = 60
ELAPSED_TIME = 0
UPDATE_INTERVAL = 5
BT_THREAD: Process = None
if __name__ == "__main__":
if len(sys.argv) >= 2:
SCAN_TIME = int(sys.argv[1])
print("running scan on " + str(SCAN_TIME))
bt = BluetoothCTL()
if is_bluetooth_running():
exit()
bt.power = True
bt.agent = True
bt.discoverable = True
bt.pairable = True
while True:
if SCAN_TIME >= 0:
if ELAPSED_TIME > SCAN_TIME:
exit()
devices_count = len([d for d in bt.devices() if d.connected])
# print("devices_count=", devices_count)
if (not is_in_game() or devices_count == 0) and not bt.scanning:
print("emustation is active now, resuming scan")
BT_THREAD = Process(target=bt.scan_async, args=(SCAN_TIME,))
BT_THREAD.start()
elif (is_in_game() or devices_count > 0) and bt.scanning:
print("emustation is inactive now, stopping scan")
if BT_THREAD is not None:
BT_THREAD.terminate()
bt.scan_stop()
time.sleep(UPDATE_INTERVAL)
ELAPSED_TIME += UPDATE_INTERVAL
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment