Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Langerz82/41cef0d791181dc8bc74a6a463129a2d to your computer and use it in GitHub Desktop.
Save Langerz82/41cef0d791181dc8bc74a6a463129a2d to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
from subprocess import Popen, PIPE
from dataclasses import dataclass
import time
import sys
DEBUG = False
@dataclass
class BluetoothDevice:
mac: str
name: str
alias: str
clas: str
icon: str
paired: bool
bounded: bool
trusted: bool
blocked: bool
connected: bool
wake_allowed: bool
legacy_pairing: bool
rssi: int
class BluetoothCTL:
def _execute(self, cmd: list[str]) -> list[str]:
if DEBUG:
print("> " + " ".join(x for x in cmd))
out: list[str] = []
with Popen(cmd, stdout=PIPE, bufsize=1, universal_newlines=True) as p:
for line in p.stdout:
out.append(line)
if DEBUG:
print(line)
return out
def _execute_lookup(self, cmd: list[str], match: str) -> str:
out = self._execute(cmd)
for o in out:
if match in o:
return o
return ""
def _execute_async(self, cmd: list[str]):
Popen(cmd, stdout=PIPE, bufsize=1, universal_newlines=True)
def _parse_device_info(self, mac: str):
info = self._execute(["bluetoothctl", "info", mac])
dev = BluetoothDevice(
mac=mac,
name="",
alias="",
clas="",
icon="",
paired=False,
bounded=False,
trusted=False,
blocked=False,
connected=False,
wake_allowed=False,
legacy_pairing=False,
rssi=0,
)
if DEBUG:
print(info)
for i in info:
if "Name:" in i:
dev.name = i.split(":")[1].strip() # bug - default strip make name and alias blank.
if "Alias:" in i:
dev.alias = i.split(":")[1].strip()
if "Class:" in i:
dev.clas = i.split(":")[1].strip()
if "Icon:" in i:
dev.icon = i.split(":")[1].strip()
if "Paired:" in i:
dev.paired = i.split(":")[1].strip() == "yes"
if "Bounded:" in i:
dev.bounded = i.split(":")[1].strip() == "yes"
if "Trusted:" in i:
dev.trusted = i.split(":")[1].strip() == "yes"
if "Blocked:" in i:
dev.blocked = i.split(":")[1].strip() == "yes"
if "Connected:" in i:
dev.connected = i.split(":")[1].strip() == "yes"
if "WakeAllowed:" in i:
dev.wake_allowed = i.split(":")[1].strip() == "yes"
if "LegacyPairing:" in i:
dev.legacy_pairing = i.split(":")[1].strip() == "yes"
if "RSSI:" in i:
dev.rssi = int(i.split(":")[1].strip()) # bug - my PS3 Gamepad does not have this value.
return dev
@property
def power(self) -> bool:
return "yes" in self._execute_lookup(["bluetoothctl", "show"], "Pairable")
@property
def discoverable(self) -> bool:
return "yes" in self._execute_lookup(["bluetoothctl", "show"], "Discoverable")
@property
def pairable(self) -> bool:
return "yes" in self._execute_lookup(["bluetoothctl", "show"], "Pairable")
@power.setter
def power(self, value):
self._execute(["bluetoothctl", "power", "on" if value else "off"])
@discoverable.setter
def discoverable(self, value):
self._execute(["bluetoothctl", "discoverable", "on" if value else "off"])
@pairable.setter
def pairable(self, value):
self._execute(["bluetoothctl", "pairable", "on" if value else "off"])
@pairable.setter
def agent(self, value):
self._execute(["bluetoothctl", "agent", "on" if value else "off"])
def scan(self, timeout=10):
self._execute(["bluetoothctl", "--timeout", str(timeout), "scan", "on"])
def scan_async(self, timeout=10):
self._execute_async(["bluetoothctl", "--timeout", str(timeout), "scan", "on"])
def devices(self) -> list[BluetoothDevice]:
devs = self._execute(["bluetoothctl", "devices"])
return [self._parse_device_info(dev.split()[1]) for dev in devs]
def trust(self, dev: BluetoothDevice) -> bool:
return "succeeded" in self._execute_lookup(
["bluetoothctl", "trust", dev.mac], "trust succeeded"
)
def untrust(self, dev: BluetoothDevice) -> bool:
return "succeeded" in self._execute_lookup(
["bluetoothctl", "untrust", dev.mac], "untrust succeeded"
)
def pair(self, dev: BluetoothDevice) -> bool:
return "successful" in self._execute_lookup(
["bluetoothctl", "pair", dev.mac], "Pairing successful"
)
def connect(self, dev: BluetoothDevice) -> bool:
return "successful" in self._execute_lookup(
["bluetoothctl", "connect", dev.mac], "Connection successful"
)
def forget(self, dev: BluetoothDevice) -> bool:
return "removed" in self._execute_lookup(
["bluetoothctl", "remove", dev.mac], "Device has been removed"
)
SCAN_TIME = 60
SCAN_INTERVAL = 15
if __name__ == "__main__":
if len(sys.argv) == 2:
SCAN_INTERVAL = int(sys.argv[1]);
if len(sys.argv) == 3:
SCAN_INTERVAL = int(sys.argv[1]);
SCAN_TIME = int(sys.argv[2]);
bt = BluetoothCTL()
bt.power = True
bt.agent = True
bt.discoverable = True
bt.pairable = True
print(
"Scanning available devices for {} seconds, with interval {}, please wait...".format(
SCAN_TIME, SCAN_INTERVAL
)
)
bt.scan_async(timeout=SCAN_TIME)
for i in range(SCAN_TIME // SCAN_INTERVAL):
time.sleep(SCAN_INTERVAL)
print("Starting scan {}, please wait...".format(i + 1))
devices = bt.devices()
for dev in devices:
print(
"detected device {}, mac: {}, icon: {}, paired: {}".format(
dev.name, dev.mac, dev.icon, dev.paired
)
)
if (
not dev.connected
and dev.rssi != 0
and "input-" in dev.icon
and len(dev.name) > 0
):
print(
"found device {}, mac: {}, icon: {}, paired: {}".format(
dev.name, dev.mac, dev.icon, dev.paired
)
)
connectFailed = False
if not dev.trusted:
# trust
if bt.trust(dev):
print("successfully trusted {}".format(dev.name))
else:
connectFailed = True
if not connectFailed and not dev.paired:
# pair
if bt.pair(dev):
print("successfully paired {}".format(dev.name))
else:
connectFailed = True
if not connectFailed and not dev.connected:
# connected
if bt.connect(dev):
print("successfully paired {}".format(dev.name))
else:
connectFailed = True
if connectFailed:
# if for any reason one step fails, forget the device so next time it can be paired
bt.forget(dev)
print("failed to pair {}, try again".format(dev.name))
elif (
dev.connected and not dev.trusted and not dev.paired
and "input-" in dev.icon
):
print(
"trusting device {}, mac: {}, icon: {}, paired: {}".format(
dev.name, dev.mac, dev.icon, dev.paired
)
)
if bt.trust(dev) and bt.pair(dev):
print("successfully paired {} now connect cable.".format(dev.name))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment