Skip to content

Instantly share code, notes, and snippets.

@Langerz82
Last active July 5, 2023 07:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Langerz82/6838a6c44232902a47182658c55cc00f to your computer and use it in GitHub Desktop.
Save Langerz82/6838a6c44232902a47182658c55cc00f to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
from subprocess import Popen, PIPE, check_output
from dataclasses import dataclass
import time
import sys
import os
import re
DEBUG = False
@dataclass
class BluetoothDevice:
mac: str
name: str
alias: str
clas: str
icon: str
paired: bool
bounded: bool
trusted: bool
blocked: bool
connected: bool
wake_allowed: bool
legacy_pairing: bool
rssi: int
class ShellIO:
@staticmethod
def execute(cmd: list[str]) -> list[str]:
if DEBUG:
print("> " + " ".join(x for x in cmd))
out: list[str] = []
with Popen(cmd, stdout=PIPE, bufsize=1, universal_newlines=True) as p:
for line in p.stdout:
out.append(line)
if DEBUG:
print(line)
return out
@staticmethod
def execute_lookup(cmd: list[str], match: str) -> str:
out = ShellIO.execute(cmd)
for o in out:
if match in o:
return o
return ""
@staticmethod
def execute_async(cmd: list[str]):
Popen(cmd, stdout=PIPE, bufsize=1, universal_newlines=True)
@staticmethod
def execute_output(cmd: list[str]):
exec_in = ' '.join(cmd)
return os.popen(exec_in).readlines()
class BluetoothCTL:
def _parse_device_info(self, mac: str):
info = ShellIO.execute(["bluetoothctl", "info", mac])
dev = BluetoothDevice(
mac=mac,
name="",
alias="",
clas="",
icon="",
paired=False,
bounded=False,
trusted=False,
blocked=False,
connected=False,
wake_allowed=False,
legacy_pairing=False,
rssi=0,
)
for i in info:
if "Name:" in i:
dev.name = i.split(":")[1].strip()
if "Alias:" in i:
dev.alias = i.split(":")[1].strip()
if "Class:" in i:
dev.clas = i.split(":")[1].strip()
if "Icon:" in i:
dev.icon = i.split(":")[1].strip()
if "Paired:" in i:
dev.paired = i.split(":")[1].strip() == "yes"
if "Bounded:" in i:
dev.bounded = i.split(":")[1].strip() == "yes"
if "Trusted:" in i:
dev.trusted = i.split(":")[1].strip() == "yes"
if "Blocked:" in i:
dev.blocked = i.split(":")[1].strip() == "yes"
if "Connected:" in i:
dev.connected = i.split(":")[1].strip() == "yes"
if "WakeAllowed:" in i:
dev.wake_allowed = i.split(":")[1].strip() == "yes"
if "LegacyPairing:" in i:
dev.legacy_pairing = i.split(":")[1].strip() == "yes"
if "RSSI:" in i:
dev.rssi = int(i.split(":")[1].strip())
return dev
@property
def power(self) -> bool:
return "yes" in ShellIO.execute_lookup(["bluetoothctl", "show"], "Pairable")
@property
def discoverable(self) -> bool:
return "yes" in ShellIO.execute_lookup(["bluetoothctl", "show"], "Discoverable")
@property
def pairable(self) -> bool:
return "yes" in ShellIO.execute_lookup(["bluetoothctl", "show"], "Pairable")
@power.setter
def power(self, value):
ShellIO.execute(["bluetoothctl", "power", "on" if value else "off"])
@discoverable.setter
def discoverable(self, value):
ShellIO.execute(["bluetoothctl", "discoverable", "on" if value else "off"])
@pairable.setter
def pairable(self, value):
ShellIO.execute(["bluetoothctl", "pairable", "on" if value else "off"])
@pairable.setter
def agent(self, value):
ShellIO.execute(["bluetoothctl", "agent", "on" if value else "off"])
def scan(self, timeout=10):
ShellIO.execute(["bluetoothctl", "--timeout", str(timeout), "scan", "on"])
def scan_async(self, timeout=10):
if timeout > 0:
ShellIO.execute_async(
["bluetoothctl", "--timeout", str(timeout), "scan", "on"]
)
else:
ShellIO.execute_async(["bluetoothctl", "scan", "on"])
def scan_stop(self):
ShellIO.execute_async(["pkill", "-f", "bluetoothctl scan on"])
def devices(self) -> list[BluetoothDevice]:
devs = ShellIO.execute(["bluetoothctl", "devices"])
return [self._parse_device_info(dev.split()[1]) for dev in devs]
def trust(self, dev: BluetoothDevice) -> bool:
return "succeeded" in ShellIO.execute_lookup(
["bluetoothctl", "trust", dev.mac], "trust succeeded"
)
def untrust(self, dev: BluetoothDevice) -> bool:
return "succeeded" in ShellIO.execute_lookup(
["bluetoothctl", "untrust", dev.mac], "untrust succeeded"
)
def pair(self, dev: BluetoothDevice) -> bool:
return "successful" in ShellIO.execute_lookup(
["bluetoothctl", "pair", dev.mac], "Pairing successful"
)
def connect(self, dev: BluetoothDevice) -> bool:
return "successful" in ShellIO.execute_lookup(
["bluetoothctl", "connect", dev.mac], "Connection successful"
)
# add by flying wang.
def disconnect(self, dev: BluetoothDevice) -> bool:
return "successful" in ShellIO.execute_lookup(
["bluetoothctl", "disconnect", dev.mac], "Disconnection successful"
)
def forget(self, dev: BluetoothDevice) -> bool:
return "removed" in ShellIO.execute_lookup(
["bluetoothctl", "remove", dev.mac], "Device has been removed"
)
# returns true if a game is runnign via the emuelecRunEmu script
def is_in_game():
return ShellIO.execute_lookup(["ps", "-ef"], "emuelecRunEmu.sh")
# returns true if another process (not current) is running infinite scan
def is_bluetooth_running():
out = ShellIO.execute(["ps", "-ef"])
for o in out:
if re.search(r"(emuelec-bluetooth) \d+ -1", o) is not None:
return False if str(os.getpid()) in o else True
return False
def in_game_poll():
try:
out = ShellIO.execute_output(['. /etc/profile &&','get_ee_setting bt_poll_game'])
return int(out[0][:-1])
except ValueError as ve:
return 0;
def bt_ignore_forget():
out = ShellIO.execute_output(['. /etc/profile &&','get_ee_setting bt_forget_off'])
return out[0][:-1] == "yes"
MACS_FILE = r'/storage/.config/bt-sixpair-macs'
def load_sixpair_macs():
macs = []
if os.path.isfile(MACS_FILE):
macs_file = open(MACS_FILE, "r")
for mac in macs_file:
macs.append(mac[:-1])
return macs
def save_sixpair_macs(macs):
macs_file = open(MACS_FILE, "w")
for mac in macs:
macs_file.write(f'{mac}\n')
LIFETIME = 30
SCAN_INTERVAL=5
SCAN_TIME = 8
CONNECT_ATTEMPTS = 5
MAX_SCAN_TIME = 30
MAX_ATTEMPTS = 3
if __name__ == "__main__":
scan_interval = SCAN_INTERVAL
if len(sys.argv) > 1:
LIFETIME = int(sys.argv[1])
if len(sys.argv) > 2:
scan_interval = int(sys.argv[2])
if len(sys.argv) > 3:
SCAN_TIME = int(sys.argv[3])
if len(sys.argv) > 4:
CONNECT_ATTEMPTS = int(sys.argv[4])
bt = BluetoothCTL()
while (is_bluetooth_running()):
if (time.time() - start_time < LIFETIME):
sys.exit()
time.sleep(2)
bt.power = True
bt.agent = True
bt.discoverable = True
bt.pairable = True
start_time = time.time()
scan_active = False
iteration = 1
# game polling for controllers that lose power during game,
# they can be re-scanned still.
in_game_poll_interval = in_game_poll()
sixpair_macs = load_sixpair_macs() # sixpair code
while (time.time() - start_time < LIFETIME):
print(
"Scanning available devices for up to {} seconds, with interval {}, please wait...".format(
SCAN_TIME, scan_interval
)
)
SCAN_INTERVAL = scan_interval
# decide whether we should stop or resume scanning and pairing
should_scan = not (is_in_game() and in_game_poll_interval == 0)
if should_scan and not scan_active:
print("emustation is active now, resuming scan")
if in_game_poll_interval > 0:
SCAN_INTERVAL = in_game_poll_interval
bt.scan_async(timeout=MAX_SCAN_TIME)
scan_active = True
elif not should_scan and scan_active:
print("emustation is inactive now, stopping scan")
bt.scan_stop()
scan_active = False
print("Starting scan {}, please wait...".format(iteration))
time.sleep(2)
dev_ignore = []
dev_attempts = {}
# stop if a game is running and script not enable for in-game polling.
should_scan = not (is_in_game() and in_game_poll_interval == 0)
if not should_scan:
continue
for i in range(CONNECT_ATTEMPTS):
print("Starting connect attempt {}, please wait...".format(i + 1))
devices = bt.devices()
for dev in devices:
if True:
print(dev)
if dev.blocked:
continue
if not dev.mac in dev_attempts:
dev_attempts[dev.mac] = 0
# Ignore connections that have failed several times or dont need connecting.
if dev_attempts[dev.mac] >= MAX_ATTEMPTS: # Try to connect attempts in a single scan.
continue
if (
not dev.connected # exclude already connected devices
and (dev.rssi != 0)
and "input-" in dev.icon # exclude any non-input device
):
if not dev.trusted:
print("trusting a device {}".format(dev.name))
if bt.trust(dev):
dev.trusted = True
else:
print("device {} not trusted.".format(dev.name))
if ( # This method is for some controllers that show connect right away but not trusted.
dev.connected and not dev.trusted
and "input-" in dev.icon
):
print(
"trusting and pairing device {}, mac: {}, icon: {}, paired: {}".format(
dev.name, dev.mac, dev.icon, dev.paired
)
)
print("trusting device {}".format(dev.alias))
if bt.trust(dev):
print("pairing device {}".format(dev.alias))
bt.pair(dev)
if dev.mac not in sixpair_macs:
sixpair_macs.append(dev.mac) # once trusted list so it doesn't go through connecting.
print("successfully trusted {} now connect cable.".format(dev.name))
continue
else:
print("device {} not trusted.".format(dev.name))
if (
dev.trusted
and not (dev.connected and dev.paired)
and not dev.mac in sixpair_macs # sixpair code
):
# Only needs pairing if the device is not paired.
if not dev.paired:
print("pairing a trusted device {}".format(dev.name))
if not bt.pair(dev):
dev_attempts[dev.mac] += 1
continue
print("connecting a paired device {}".format(dev.name))
if not dev.connected:
if bt.connect(dev):
print("successfully connected {}".format(dev.name))
else:
# on failure add attempt and forget device so can re-try.
print("failed to connect {}, try again.".format(dev.name))
dev_attempts[dev.mac] += 1
if dev_attempts[dev.mac] >= (MAX_ATTEMPTS-1) and not bt_ignore_forget:
print("forgetting device {}.".format(dev.name))
bt.forget(dev)
time.sleep(SCAN_TIME // CONNECT_ATTEMPTS)
print("Stopped scan {}.".format(iteration))
bt.scan_stop()
scan_active = False
iteration += 1
save_sixpair_macs(sixpair_macs) # sixpair code
time.sleep(SCAN_INTERVAL) # scan break in-between so devices that dont use this script can connect too in-between scans.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment