Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Bluetoothctl wrapper in Python
# Based on ReachView code from Egor Fedorov (egor.fedorov@emlid.com)
# Updated for Python 3.6.8 on a Raspberry Pi
import time
import pexpect
import subprocess
import sys
import logging
logger = logging.getLogger("btctl")
class Bluetoothctl:
"""A wrapper for bluetoothctl utility."""
def __init__(self):
subprocess.check_output("rfkill unblock bluetooth", shell=True)
self.process = pexpect.spawnu("bluetoothctl", echo=False)
def send(self, command, pause=0):
self.process.send(f"{command}\n")
time.sleep(pause)
if self.process.expect(["bluetooth", pexpect.EOF]):
raise Exception(f"failed after {command}")
def get_output(self, *args, **kwargs):
"""Run a command in bluetoothctl prompt, return output as a list of lines."""
self.send(*args, **kwargs)
return self.process.before.split("\r\n")
def start_scan(self):
"""Start bluetooth scanning process."""
try:
self.send("scan on")
except Exception as e:
logger.error(e)
def make_discoverable(self):
"""Make device discoverable."""
try:
self.send("discoverable on")
except Exception as e:
logger.error(e)
def parse_device_info(self, info_string):
"""Parse a string corresponding to a device."""
device = {}
block_list = ["[\x1b[0;", "removed"]
if not any(keyword in info_string for keyword in block_list):
try:
device_position = info_string.index("Device")
except ValueError:
pass
else:
if device_position > -1:
attribute_list = info_string[device_position:].split(" ", 2)
device = {
"mac_address": attribute_list[1],
"name": attribute_list[2],
}
return device
def get_available_devices(self):
"""Return a list of tuples of paired and discoverable devices."""
available_devices = []
try:
out = self.get_output("devices")
except Exception as e:
logger.error(e)
else:
for line in out:
device = self.parse_device_info(line)
if device:
available_devices.append(device)
return available_devices
def get_paired_devices(self):
"""Return a list of tuples of paired devices."""
paired_devices = []
try:
out = self.get_output("paired-devices")
except Exception as e:
logger.error(e)
else:
for line in out:
device = self.parse_device_info(line)
if device:
paired_devices.append(device)
return paired_devices
def get_discoverable_devices(self):
"""Filter paired devices out of available."""
available = self.get_available_devices()
paired = self.get_paired_devices()
return [d for d in available if d not in paired]
def get_device_info(self, mac_address):
"""Get device info by mac address."""
try:
out = self.get_output(f"info {mac_address}")
except Exception as e:
logger.error(e)
return False
else:
return out
def pair(self, mac_address):
"""Try to pair with a device by mac address."""
try:
self.send(f"pair {mac_address}", 4)
except Exception as e:
logger.error(e)
return False
else:
res = self.process.expect(
["Failed to pair", "Pairing successful", pexpect.EOF]
)
return res == 1
def trust(self, mac_address):
try:
self.send(f"trust {mac_address}", 4)
except Exception as e:
logger.error(e)
return False
else:
res = self.process.expect(
["Failed to trust", "Pairing successful", pexpect.EOF]
)
return res == 1
def remove(self, mac_address):
"""Remove paired device by mac address, return success of the operation."""
try:
self.send(f"remove {mac_address}", 3)
except Exception as e:
logger.error(e)
return False
else:
res = self.process.expect(
["not available", "Device has been removed", pexpect.EOF]
)
return res == 1
def connect(self, mac_address):
"""Try to connect to a device by mac address."""
try:
self.send(f"connect {mac_address}", 2)
except Exception as e:
logger.error(e)
return False
else:
res = self.process.expect(
["Failed to connect", "Connection successful", pexpect.EOF]
)
return res == 1
def disconnect(self, mac_address):
"""Try to disconnect to a device by mac address."""
try:
self.send(f"disconnect {mac_address}", 2)
except Exception as e:
logger.error(e)
return False
else:
res = self.process.expect(
["Failed to disconnect", "Successful disconnected", pexpect.EOF]
)
return res == 1
@filipe-aguiar

This comment has been minimized.

Copy link

@filipe-aguiar filipe-aguiar commented Apr 19, 2019

f"{command}\n" is not accepted by python3.

@Luthor2k

This comment has been minimized.

Copy link

@Luthor2k Luthor2k commented Sep 16, 2019

@filipe-aguiar check that you're running in python3.6 or above, 3.5 does not support this f-string format.

@filipe-aguiar

This comment has been minimized.

Copy link

@filipe-aguiar filipe-aguiar commented Sep 16, 2019

@Luthor2k
Thanks for the tip. Yes, it works now.

@lalten

This comment has been minimized.

Copy link

@lalten lalten commented Mar 21, 2020

Doesn't work for me :/

$ sudo ipython3
Python 3.6.9 (default, Nov  7 2019, 10:44:02) 
Type 'copyright', 'credits' or 'license' for more information
IPython 7.13.0 -- An enhanced Interactive Python. Type '?' for help.

In [1]: from bluetoothctl import Bluetoothctl                                                                                                                  

In [2]: B = Bluetoothctl()                                                                                                                                     

In [3]: B.get_available_devices()                                                                                                                              
Timeout exceeded.
<pexpect.pty_spawn.spawn object at 0x7f98ebb352e8>
command: /usr/bin/bluetoothctl
args: [b'/usr/bin/bluetoothctl']
buffer (last 100 chars): '01:01:54:00 WHG 605 (021504)\r\nDevice F8:A0:B8:0A:C6:CD vívosmart HR\r\n\x1b[0;94m[WHG 605 (021504)]\x1b[0m# '
before (last 100 chars): '01:01:54:00 WHG 605 (021504)\r\nDevice F8:A0:B8:0A:C6:CD vívosmart HR\r\n\x1b[0;94m[WHG 605 (021504)]\x1b[0m# '
after: <class 'pexpect.exceptions.TIMEOUT'>
match: None
match_index: None
exitstatus: None
flag_eof: False
pid: 11946
child_fd: 15
closed: False
timeout: 30
delimiter: <class 'pexpect.exceptions.EOF'>
logfile: None
logfile_read: None
logfile_send: None
maxread: 2000
ignorecase: False
searchwindowsize: None
delaybeforesend: 0.05
delayafterclose: 0.1
delayafterterminate: 0.1
searcher: searcher_re:
    0: re.compile("bluetooth")
    1: EOF
Out[3]: []

In [4]: B.connect('E4:85:01:01:54:00')                                                                                                                         
Timeout exceeded.
<pexpect.pty_spawn.spawn object at 0x7f98ebb352e8>
command: /usr/bin/bluetoothctl
args: [b'/usr/bin/bluetoothctl']
buffer (last 100 chars): '# Attempting to connect to E4:85:01:01:54:00\r\n\x1b[0;94m[WHG 605 (021504)]\x1b[0m# Connection successful\r\n'
before (last 100 chars): '# Attempting to connect to E4:85:01:01:54:00\r\n\x1b[0;94m[WHG 605 (021504)]\x1b[0m# Connection successful\r\n'
after: <class 'pexpect.exceptions.TIMEOUT'>
match: None
match_index: None
exitstatus: None
flag_eof: False
pid: 11946
child_fd: 15
closed: False
timeout: 30
delimiter: <class 'pexpect.exceptions.EOF'>
logfile: None
logfile_read: None
logfile_send: None
maxread: 2000
ignorecase: False
searchwindowsize: None
delaybeforesend: 0.05
delayafterclose: 0.1
delayafterterminate: 0.1
searcher: searcher_re:
    0: re.compile("bluetooth")
    1: EOF
Out[4]: False

@mrgfisher

This comment has been minimized.

Copy link

@mrgfisher mrgfisher commented Mar 21, 2020

Hi,

Think you need to B.start_scan()

If that doesn't work I'll post my (simple!) code that works as an example.

@faberast

This comment has been minimized.

Copy link

@faberast faberast commented Aug 24, 2020

I had the 'Timeout exceeded' error mentioned by @lalten, too, until I added 'pexpect.TIMEOUT' to the pattern list of the expect() calls.
e.g.
self.process.expect(["bluetooth", pexpect.EOF, pexpect.TIMEOUT])

Although that helps, I found another issue that was sometimes causing the timeouts. On line 25 (in the 'send' method), this code is looking for the end of output from the previously-executed command by checking for the term 'bluetooth' which typically appears in brackets at the start of the next prompt (within the bluetoothctl utility. In my experience, sometimes the word bluetooth is actually replaced by the name of a connected device, which causes the code to keep waiting for the word bluetooth to appear until the timeout value is reached. In my case, the prompt always starts with '#' so using that as the search time solves the problem:

if self.process.expect(["#", pexpect.EOF, pexpect.TIMEOUT]):

I have not seen the pound sign in any of the rest of bluetoothctl's output, so this works.

@santhosh12992

This comment has been minimized.

Copy link

@santhosh12992 santhosh12992 commented Aug 31, 2020

How we can extend this code to support the read and write operations.

Basically we need to "connect" then "menu gatt" to access the bluetoothctl gatt commands.

@NerdboyQ

This comment has been minimized.

Copy link

@NerdboyQ NerdboyQ commented Oct 3, 2020

I can discover my devices with no problem, but I need to pair with a pin. I am trying to pair with an arduino that has a HC-05 BT module attached. Is there an option to force the pin to specific number?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment