Last active
July 5, 2023 07:16
-
-
Save Langerz82/6838a6c44232902a47182658c55cc00f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from subprocess import Popen, PIPE, check_output | |
from dataclasses import dataclass | |
import time | |
import sys | |
import os | |
import re | |
DEBUG = False | |
@dataclass | |
class BluetoothDevice: | |
mac: str | |
name: str | |
alias: str | |
clas: str | |
icon: str | |
paired: bool | |
bounded: bool | |
trusted: bool | |
blocked: bool | |
connected: bool | |
wake_allowed: bool | |
legacy_pairing: bool | |
rssi: int | |
class ShellIO: | |
@staticmethod | |
def execute(cmd: list[str]) -> list[str]: | |
if DEBUG: | |
print("> " + " ".join(x for x in cmd)) | |
out: list[str] = [] | |
with Popen(cmd, stdout=PIPE, bufsize=1, universal_newlines=True) as p: | |
for line in p.stdout: | |
out.append(line) | |
if DEBUG: | |
print(line) | |
return out | |
@staticmethod | |
def execute_lookup(cmd: list[str], match: str) -> str: | |
out = ShellIO.execute(cmd) | |
for o in out: | |
if match in o: | |
return o | |
return "" | |
@staticmethod | |
def execute_async(cmd: list[str]): | |
Popen(cmd, stdout=PIPE, bufsize=1, universal_newlines=True) | |
@staticmethod | |
def execute_output(cmd: list[str]): | |
exec_in = ' '.join(cmd) | |
return os.popen(exec_in).readlines() | |
class BluetoothCTL: | |
def _parse_device_info(self, mac: str): | |
info = ShellIO.execute(["bluetoothctl", "info", mac]) | |
dev = BluetoothDevice( | |
mac=mac, | |
name="", | |
alias="", | |
clas="", | |
icon="", | |
paired=False, | |
bounded=False, | |
trusted=False, | |
blocked=False, | |
connected=False, | |
wake_allowed=False, | |
legacy_pairing=False, | |
rssi=0, | |
) | |
for i in info: | |
if "Name:" in i: | |
dev.name = i.split(":")[1].strip() | |
if "Alias:" in i: | |
dev.alias = i.split(":")[1].strip() | |
if "Class:" in i: | |
dev.clas = i.split(":")[1].strip() | |
if "Icon:" in i: | |
dev.icon = i.split(":")[1].strip() | |
if "Paired:" in i: | |
dev.paired = i.split(":")[1].strip() == "yes" | |
if "Bounded:" in i: | |
dev.bounded = i.split(":")[1].strip() == "yes" | |
if "Trusted:" in i: | |
dev.trusted = i.split(":")[1].strip() == "yes" | |
if "Blocked:" in i: | |
dev.blocked = i.split(":")[1].strip() == "yes" | |
if "Connected:" in i: | |
dev.connected = i.split(":")[1].strip() == "yes" | |
if "WakeAllowed:" in i: | |
dev.wake_allowed = i.split(":")[1].strip() == "yes" | |
if "LegacyPairing:" in i: | |
dev.legacy_pairing = i.split(":")[1].strip() == "yes" | |
if "RSSI:" in i: | |
dev.rssi = int(i.split(":")[1].strip()) | |
return dev | |
@property | |
def power(self) -> bool: | |
return "yes" in ShellIO.execute_lookup(["bluetoothctl", "show"], "Pairable") | |
@property | |
def discoverable(self) -> bool: | |
return "yes" in ShellIO.execute_lookup(["bluetoothctl", "show"], "Discoverable") | |
@property | |
def pairable(self) -> bool: | |
return "yes" in ShellIO.execute_lookup(["bluetoothctl", "show"], "Pairable") | |
@power.setter | |
def power(self, value): | |
ShellIO.execute(["bluetoothctl", "power", "on" if value else "off"]) | |
@discoverable.setter | |
def discoverable(self, value): | |
ShellIO.execute(["bluetoothctl", "discoverable", "on" if value else "off"]) | |
@pairable.setter | |
def pairable(self, value): | |
ShellIO.execute(["bluetoothctl", "pairable", "on" if value else "off"]) | |
@pairable.setter | |
def agent(self, value): | |
ShellIO.execute(["bluetoothctl", "agent", "on" if value else "off"]) | |
def scan(self, timeout=10): | |
ShellIO.execute(["bluetoothctl", "--timeout", str(timeout), "scan", "on"]) | |
def scan_async(self, timeout=10): | |
if timeout > 0: | |
ShellIO.execute_async( | |
["bluetoothctl", "--timeout", str(timeout), "scan", "on"] | |
) | |
else: | |
ShellIO.execute_async(["bluetoothctl", "scan", "on"]) | |
def scan_stop(self): | |
ShellIO.execute_async(["pkill", "-f", "bluetoothctl scan on"]) | |
def devices(self) -> list[BluetoothDevice]: | |
devs = ShellIO.execute(["bluetoothctl", "devices"]) | |
return [self._parse_device_info(dev.split()[1]) for dev in devs] | |
def trust(self, dev: BluetoothDevice) -> bool: | |
return "succeeded" in ShellIO.execute_lookup( | |
["bluetoothctl", "trust", dev.mac], "trust succeeded" | |
) | |
def untrust(self, dev: BluetoothDevice) -> bool: | |
return "succeeded" in ShellIO.execute_lookup( | |
["bluetoothctl", "untrust", dev.mac], "untrust succeeded" | |
) | |
def pair(self, dev: BluetoothDevice) -> bool: | |
return "successful" in ShellIO.execute_lookup( | |
["bluetoothctl", "pair", dev.mac], "Pairing successful" | |
) | |
def connect(self, dev: BluetoothDevice) -> bool: | |
return "successful" in ShellIO.execute_lookup( | |
["bluetoothctl", "connect", dev.mac], "Connection successful" | |
) | |
# add by flying wang. | |
def disconnect(self, dev: BluetoothDevice) -> bool: | |
return "successful" in ShellIO.execute_lookup( | |
["bluetoothctl", "disconnect", dev.mac], "Disconnection successful" | |
) | |
def forget(self, dev: BluetoothDevice) -> bool: | |
return "removed" in ShellIO.execute_lookup( | |
["bluetoothctl", "remove", dev.mac], "Device has been removed" | |
) | |
# returns true if a game is runnign via the emuelecRunEmu script | |
def is_in_game(): | |
return ShellIO.execute_lookup(["ps", "-ef"], "emuelecRunEmu.sh") | |
# returns true if another process (not current) is running infinite scan | |
def is_bluetooth_running(): | |
out = ShellIO.execute(["ps", "-ef"]) | |
for o in out: | |
if re.search(r"(emuelec-bluetooth) \d+ -1", o) is not None: | |
return False if str(os.getpid()) in o else True | |
return False | |
def in_game_poll(): | |
try: | |
out = ShellIO.execute_output(['. /etc/profile &&','get_ee_setting bt_poll_game']) | |
return int(out[0][:-1]) | |
except ValueError as ve: | |
return 0; | |
def bt_ignore_forget(): | |
out = ShellIO.execute_output(['. /etc/profile &&','get_ee_setting bt_forget_off']) | |
return out[0][:-1] == "yes" | |
MACS_FILE = r'/storage/.config/bt-sixpair-macs' | |
def load_sixpair_macs(): | |
macs = [] | |
if os.path.isfile(MACS_FILE): | |
macs_file = open(MACS_FILE, "r") | |
for mac in macs_file: | |
macs.append(mac[:-1]) | |
return macs | |
def save_sixpair_macs(macs): | |
macs_file = open(MACS_FILE, "w") | |
for mac in macs: | |
macs_file.write(f'{mac}\n') | |
LIFETIME = 30 | |
SCAN_INTERVAL=5 | |
SCAN_TIME = 8 | |
CONNECT_ATTEMPTS = 5 | |
MAX_SCAN_TIME = 30 | |
MAX_ATTEMPTS = 3 | |
if __name__ == "__main__": | |
scan_interval = SCAN_INTERVAL | |
if len(sys.argv) > 1: | |
LIFETIME = int(sys.argv[1]) | |
if len(sys.argv) > 2: | |
scan_interval = int(sys.argv[2]) | |
if len(sys.argv) > 3: | |
SCAN_TIME = int(sys.argv[3]) | |
if len(sys.argv) > 4: | |
CONNECT_ATTEMPTS = int(sys.argv[4]) | |
bt = BluetoothCTL() | |
while (is_bluetooth_running()): | |
if (time.time() - start_time < LIFETIME): | |
sys.exit() | |
time.sleep(2) | |
bt.power = True | |
bt.agent = True | |
bt.discoverable = True | |
bt.pairable = True | |
start_time = time.time() | |
scan_active = False | |
iteration = 1 | |
# game polling for controllers that lose power during game, | |
# they can be re-scanned still. | |
in_game_poll_interval = in_game_poll() | |
sixpair_macs = load_sixpair_macs() # sixpair code | |
while (time.time() - start_time < LIFETIME): | |
print( | |
"Scanning available devices for up to {} seconds, with interval {}, please wait...".format( | |
SCAN_TIME, scan_interval | |
) | |
) | |
SCAN_INTERVAL = scan_interval | |
# decide whether we should stop or resume scanning and pairing | |
should_scan = not (is_in_game() and in_game_poll_interval == 0) | |
if should_scan and not scan_active: | |
print("emustation is active now, resuming scan") | |
if in_game_poll_interval > 0: | |
SCAN_INTERVAL = in_game_poll_interval | |
bt.scan_async(timeout=MAX_SCAN_TIME) | |
scan_active = True | |
elif not should_scan and scan_active: | |
print("emustation is inactive now, stopping scan") | |
bt.scan_stop() | |
scan_active = False | |
print("Starting scan {}, please wait...".format(iteration)) | |
time.sleep(2) | |
dev_ignore = [] | |
dev_attempts = {} | |
# stop if a game is running and script not enable for in-game polling. | |
should_scan = not (is_in_game() and in_game_poll_interval == 0) | |
if not should_scan: | |
continue | |
for i in range(CONNECT_ATTEMPTS): | |
print("Starting connect attempt {}, please wait...".format(i + 1)) | |
devices = bt.devices() | |
for dev in devices: | |
if True: | |
print(dev) | |
if dev.blocked: | |
continue | |
if not dev.mac in dev_attempts: | |
dev_attempts[dev.mac] = 0 | |
# Ignore connections that have failed several times or dont need connecting. | |
if dev_attempts[dev.mac] >= MAX_ATTEMPTS: # Try to connect attempts in a single scan. | |
continue | |
if ( | |
not dev.connected # exclude already connected devices | |
and (dev.rssi != 0) | |
and "input-" in dev.icon # exclude any non-input device | |
): | |
if not dev.trusted: | |
print("trusting a device {}".format(dev.name)) | |
if bt.trust(dev): | |
dev.trusted = True | |
else: | |
print("device {} not trusted.".format(dev.name)) | |
if ( # This method is for some controllers that show connect right away but not trusted. | |
dev.connected and not dev.trusted | |
and "input-" in dev.icon | |
): | |
print( | |
"trusting and pairing device {}, mac: {}, icon: {}, paired: {}".format( | |
dev.name, dev.mac, dev.icon, dev.paired | |
) | |
) | |
print("trusting device {}".format(dev.alias)) | |
if bt.trust(dev): | |
print("pairing device {}".format(dev.alias)) | |
bt.pair(dev) | |
if dev.mac not in sixpair_macs: | |
sixpair_macs.append(dev.mac) # once trusted list so it doesn't go through connecting. | |
print("successfully trusted {} now connect cable.".format(dev.name)) | |
continue | |
else: | |
print("device {} not trusted.".format(dev.name)) | |
if ( | |
dev.trusted | |
and not (dev.connected and dev.paired) | |
and not dev.mac in sixpair_macs # sixpair code | |
): | |
# Only needs pairing if the device is not paired. | |
if not dev.paired: | |
print("pairing a trusted device {}".format(dev.name)) | |
if not bt.pair(dev): | |
dev_attempts[dev.mac] += 1 | |
continue | |
print("connecting a paired device {}".format(dev.name)) | |
if not dev.connected: | |
if bt.connect(dev): | |
print("successfully connected {}".format(dev.name)) | |
else: | |
# on failure add attempt and forget device so can re-try. | |
print("failed to connect {}, try again.".format(dev.name)) | |
dev_attempts[dev.mac] += 1 | |
if dev_attempts[dev.mac] >= (MAX_ATTEMPTS-1) and not bt_ignore_forget: | |
print("forgetting device {}.".format(dev.name)) | |
bt.forget(dev) | |
time.sleep(SCAN_TIME // CONNECT_ATTEMPTS) | |
print("Stopped scan {}.".format(iteration)) | |
bt.scan_stop() | |
scan_active = False | |
iteration += 1 | |
save_sixpair_macs(sixpair_macs) # sixpair code | |
time.sleep(SCAN_INTERVAL) # scan break in-between so devices that dont use this script can connect too in-between scans. | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment