Skip to content

Instantly share code, notes, and snippets.

@cdove99
Forked from castis/bluetoothctl.py
Last active April 17, 2021 18:01
Show Gist options
  • Save cdove99/db9c42b3c6b5edb658f9b4773eab9fdb to your computer and use it in GitHub Desktop.
Save cdove99/db9c42b3c6b5edb658f9b4773eab9fdb to your computer and use it in GitHub Desktop.
Bluetoothctl wrapper in Python
# Based on ReachView code from Egor Fedorov (egor.fedorov@emlid.com)
# Updated for Python 3.6.8 on a Raspberry Pi
import time
import pexpect
import subprocess
import sys
import logging
logger = logging.getLogger("btctl")
def boot2state(b):
if type(b) == bool:return "on" if b else "off"
else:return b
class Bluetoothctl:
"""A wrapper for bluetoothctl utility."""
def __init__(self):
subprocess.check_output("rfkill unblock bluetooth", shell=True)
self.process = pexpect.spawnu("bluetoothctl", echo=False)
def send(self, command, pause=0):
r = self.process.send(f"{command}\n")
time.sleep(pause)
if self.process.expect(["bluetooth", pexpect.EOF]):
raise Exception(f"failed after {command}")
def get_output(self, *args, **kwargs):
"""Run a command in bluetoothctl prompt, return output as a list of lines."""
self.send(*args, **kwargs)
return self.process.before.split("\r\n")
def set_scan(self,state):
"""set state of bluetooth scanning process."""
try:
self.send(f"scan {boot2state(state)}")
except Exception as e:
logger.error(e)
def set_discoverable(self,state):
"""set device discoverable state."""
try:
self.send(f"discoverable {boot2state(state)}",2)
except Exception as e:
logger.error(e)
def set_power(self,state):
"""set device discoverable state."""
try:
self.send(f"power {boot2state(state)}",2)
except Exception as e:
logger.error(e)
def set_pairable(self,state):
"""set device pairable state."""
try:
self.send(f"pairable {boot2state(state)}")
except Exception as e:
logger.error(e)
def parse_device_info(self, info_string):
"""Parse a string corresponding to a device."""
device = {}
block_list = ["[\x1b[0;", "removed"]
if not any(keyword in info_string for keyword in block_list):
try:
device_position = info_string.index("Device")
except ValueError:
pass
else:
if device_position > -1:
attribute_list = info_string[device_position:].split(" ", 2)
device = {
"mac_address": attribute_list[1],
"name": attribute_list[2],
}
return device
def get_available_devices(self):
"""Return a list of tuples of paired and discoverable devices."""
available_devices = []
try:
out = self.get_output("devices")
except Exception as e:
logger.error(e)
else:
for line in out:
device = self.parse_device_info(line)
if device:
available_devices.append(device)
return available_devices
def get_paired_devices(self):
"""Return a list of tuples of paired devices."""
paired_devices = []
try:
out = self.get_output("paired-devices")
except Exception as e:
logger.error(e)
else:
for line in out:
device = self.parse_device_info(line)
if device:
paired_devices.append(device)
return paired_devices
def get_discoverable_devices(self):
"""Filter paired devices out of available."""
available = self.get_available_devices()
paired = self.get_paired_devices()
return [d for d in available if d not in paired]
def get_device_info(self, mac_address):
"""Get device info by mac address."""
try:
out = self.get_output(f"info {mac_address}")
except Exception as e:
logger.error(e)
return False
else:
return out
def pair(self, mac_address):
"""Try to pair with a device by mac address."""
try:
self.send(f"pair {mac_address}", 4)
except Exception as e:
logger.error(e)
return False
else:
res = self.process.expect(
["Failed to pair", "Pairing successful", pexpect.EOF]
)
return res == 1
def trust(self, mac_address):
try:
self.send(f"trust {mac_address}", 4)
except Exception as e:
logger.error(e)
return False
else:
res = self.process.expect(
["Failed to trust", "Pairing successful", pexpect.EOF]
)
return res == 1
def remove(self, mac_address):
"""Remove paired device by mac address, return success of the operation."""
try:
self.send(f"remove {mac_address}", 3)
except Exception as e:
logger.error(e)
return False
else:
res = self.process.expect(
["not available", "Device has been removed", pexpect.EOF]
)
return res == 1
def connect(self, mac_address):
"""Try to connect to a device by mac address."""
try:
self.send(f"connect {mac_address}", 2)
except Exception as e:
logger.error(e)
return False
else:
res = self.process.expect(
["Failed to connect", "Connection successful", pexpect.EOF]
)
return res == 1
def disconnect(self, mac_address=""):
"""Try to disconnect to a device by mac address."""
try:
self.send(f"disconnect {mac_address}", 2)
except Exception as e:
logger.error(e)
return False
else:
res = self.process.expect(
["Failed to disconnect", "Successful disconnected", pexpect.EOF]
)
return res == 1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment