-
-
Save cdove99/db9c42b3c6b5edb658f9b4773eab9fdb to your computer and use it in GitHub Desktop.
Bluetoothctl wrapper in Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Based on ReachView code from Egor Fedorov (egor.fedorov@emlid.com) | |
# Updated for Python 3.6.8 on a Raspberry Pi | |
import time | |
import pexpect | |
import subprocess | |
import sys | |
import logging | |
logger = logging.getLogger("btctl") | |
def boot2state(b): | |
if type(b) == bool:return "on" if b else "off" | |
else:return b | |
class Bluetoothctl: | |
"""A wrapper for bluetoothctl utility.""" | |
def __init__(self): | |
subprocess.check_output("rfkill unblock bluetooth", shell=True) | |
self.process = pexpect.spawnu("bluetoothctl", echo=False) | |
def send(self, command, pause=0): | |
r = self.process.send(f"{command}\n") | |
time.sleep(pause) | |
if self.process.expect(["bluetooth", pexpect.EOF]): | |
raise Exception(f"failed after {command}") | |
def get_output(self, *args, **kwargs): | |
"""Run a command in bluetoothctl prompt, return output as a list of lines.""" | |
self.send(*args, **kwargs) | |
return self.process.before.split("\r\n") | |
def set_scan(self,state): | |
"""set state of bluetooth scanning process.""" | |
try: | |
self.send(f"scan {boot2state(state)}") | |
except Exception as e: | |
logger.error(e) | |
def set_discoverable(self,state): | |
"""set device discoverable state.""" | |
try: | |
self.send(f"discoverable {boot2state(state)}",2) | |
except Exception as e: | |
logger.error(e) | |
def set_power(self,state): | |
"""set device discoverable state.""" | |
try: | |
self.send(f"power {boot2state(state)}",2) | |
except Exception as e: | |
logger.error(e) | |
def set_pairable(self,state): | |
"""set device pairable state.""" | |
try: | |
self.send(f"pairable {boot2state(state)}") | |
except Exception as e: | |
logger.error(e) | |
def parse_device_info(self, info_string): | |
"""Parse a string corresponding to a device.""" | |
device = {} | |
block_list = ["[\x1b[0;", "removed"] | |
if not any(keyword in info_string for keyword in block_list): | |
try: | |
device_position = info_string.index("Device") | |
except ValueError: | |
pass | |
else: | |
if device_position > -1: | |
attribute_list = info_string[device_position:].split(" ", 2) | |
device = { | |
"mac_address": attribute_list[1], | |
"name": attribute_list[2], | |
} | |
return device | |
def get_available_devices(self): | |
"""Return a list of tuples of paired and discoverable devices.""" | |
available_devices = [] | |
try: | |
out = self.get_output("devices") | |
except Exception as e: | |
logger.error(e) | |
else: | |
for line in out: | |
device = self.parse_device_info(line) | |
if device: | |
available_devices.append(device) | |
return available_devices | |
def get_paired_devices(self): | |
"""Return a list of tuples of paired devices.""" | |
paired_devices = [] | |
try: | |
out = self.get_output("paired-devices") | |
except Exception as e: | |
logger.error(e) | |
else: | |
for line in out: | |
device = self.parse_device_info(line) | |
if device: | |
paired_devices.append(device) | |
return paired_devices | |
def get_discoverable_devices(self): | |
"""Filter paired devices out of available.""" | |
available = self.get_available_devices() | |
paired = self.get_paired_devices() | |
return [d for d in available if d not in paired] | |
def get_device_info(self, mac_address): | |
"""Get device info by mac address.""" | |
try: | |
out = self.get_output(f"info {mac_address}") | |
except Exception as e: | |
logger.error(e) | |
return False | |
else: | |
return out | |
def pair(self, mac_address): | |
"""Try to pair with a device by mac address.""" | |
try: | |
self.send(f"pair {mac_address}", 4) | |
except Exception as e: | |
logger.error(e) | |
return False | |
else: | |
res = self.process.expect( | |
["Failed to pair", "Pairing successful", pexpect.EOF] | |
) | |
return res == 1 | |
def trust(self, mac_address): | |
try: | |
self.send(f"trust {mac_address}", 4) | |
except Exception as e: | |
logger.error(e) | |
return False | |
else: | |
res = self.process.expect( | |
["Failed to trust", "Pairing successful", pexpect.EOF] | |
) | |
return res == 1 | |
def remove(self, mac_address): | |
"""Remove paired device by mac address, return success of the operation.""" | |
try: | |
self.send(f"remove {mac_address}", 3) | |
except Exception as e: | |
logger.error(e) | |
return False | |
else: | |
res = self.process.expect( | |
["not available", "Device has been removed", pexpect.EOF] | |
) | |
return res == 1 | |
def connect(self, mac_address): | |
"""Try to connect to a device by mac address.""" | |
try: | |
self.send(f"connect {mac_address}", 2) | |
except Exception as e: | |
logger.error(e) | |
return False | |
else: | |
res = self.process.expect( | |
["Failed to connect", "Connection successful", pexpect.EOF] | |
) | |
return res == 1 | |
def disconnect(self, mac_address=""): | |
"""Try to disconnect to a device by mac address.""" | |
try: | |
self.send(f"disconnect {mac_address}", 2) | |
except Exception as e: | |
logger.error(e) | |
return False | |
else: | |
res = self.process.expect( | |
["Failed to disconnect", "Successful disconnected", pexpect.EOF] | |
) | |
return res == 1 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment