Skip to content

Instantly share code, notes, and snippets.

@damouse
Last active September 6, 2018 18:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save damouse/1f5ccbb7291184a7dbc5c91b3a93f6ee to your computer and use it in GitHub Desktop.
Save damouse/1f5ccbb7291184a7dbc5c91b3a93f6ee to your computer and use it in GitHub Desktop.
Bluetoothctl text interface with asyncio in Python3.6
"""
A hilarious way to avoid writing DBus implementations by hooking into bluetoothd via the text interface
presented by bluetoothctl. You can do anything bluetoothctl can do except see live events.
In my case, I needed simple ability to pair with an Xbox One controller in python 3.6 with asyncio,
and getting dbus set up made my teeth hurt. This implementation is half serious-- its written to back
honest-to-god production software, but I can't in good conscious advise you to ever use it.
Tested with bluez 5.41 and 5.55, python3.6, and ubuntu 16.04. The only dependency is pexpect, which you
can install with:
pip install pexpect
Written by damouse in 2018. Licensed under WTFPL. Inspired by egorf at https://gist.github.com/egorf/66d88056a9d703928f93.
"""
import asyncio
import pexpect
class AsyncBluetoothctlTextProxy:
def __init__(self):
self.tty = pexpect.spawn("bluetoothctl", echo=False)
async def _send(self, command, wait=0):
""" Send a command to AsyncBluetoothctlTextProxy and return its output.
Raises a pexpect timeout exception if the prompt never shows up again.
"""
self.tty.send(command + "\n")
await asyncio.sleep(wait)
# We're waiting on the prompt to reappear-- specifically the close brace, ANSI code, and then hash
await self.tty.expect_exact(']\x1b[0m#', async_=True)
return self.tty.before.decode().split('\r\n')
def _parse_devices(self, raw_list):
""" Find lines that start with 'Device ', remove that, then return a tuple of the result split on the first space """
return [tuple(x[7:].split(' ', 1)) for x in raw_list if 'Device ' == x[:7]]
async def _monitor_scan_stream(self):
""" Spin on the scanning stream and watch for [NEW] and [DEL] messages """
while True:
line = await self.tty.readline(async_=True)
print('Have line: ', line)
async def teardown(self):
return await self.power_off()
async def power_on(self):
return await self._send('power on')
async def power_off(self):
return await self._send('power off')
async def agent_on(self):
return await self._send('agent on')
async def agent_off(self):
return await self._send('agent off')
async def default_agent(self):
return await self._send('default-agent')
async def scan_on(self):
return await self._send('scan on')
async def scan_off(self):
return await self._send('scan off')
async def discoverable_on(self):
return await self._send("discoverable on")
async def discoverable_off(self):
return await self._send("discoverable off")
async def devices(self):
out = await self._send("devices")
return self._parse_devices(out)
async def paired_devices(self):
out = await self._send("paired-devices")
return self._parse_devices(out)
async def discoverable_devices(self):
available = await self.devices()
paired = await self.paired_devices()
return [d for d in available if d not in paired]
async def info(self, mac_address):
""" Return a dict representation of the bluetoothctl info command. This dict has keys:
Name, Alias, Class, Icon, Paired, Trusted, Blocked, Connected, LegacyPairing, UUID, UUID, Modalias,
"""
info = await self._send("info " + mac_address)
info = [x[1:].split(' ', 1) for x in info if x[:1] == '\t']
return {k: v for k, v in info}
async def pair(self, mac_address):
await self._send("pair " + mac_address, 4)
return await self.tty.expect(["Failed to pair", "Pairing successful", pexpect.EOF], async_=True) == 1
async def remove(self, mac_address):
await self._send("remove " + mac_address, 3)
return await self.tty.expect(["not available", "Device has been removed", pexpect.EOF], async_=True) == 1
async def connect(self, mac_address):
await self._send("connect " + mac_address, 2)
return await self.tty.expect(["Failed to connect", "Connection successful", pexpect.EOF], async_=True) == 1
async def disconnect(self, mac_address):
await self._send("disconnect " + mac_address, 2)
return self.tty.expect(["Failed to disconnect", "Successful disconnected", pexpect.EOF], async_=True) == 1
async def main():
print('Starting async test')
b = AsyncBluetoothctlTextProxy()
# Turn everything on
await b.power_on()
await b.agent_on()
await b.scan_on()
# Wait for a while to let the scan populate
print('Scanning for 10 seconds')
await asyncio.sleep(10)
# Lis and print devices
print('Available devices:')
devices = await b.devices()
for mac, name in devices:
print(f' {mac} {name}')
# Example on how to pair-- in this case I know I'm pairing with an Xbox controller
controller = [mac for mac, name in devices if name == 'Xbox Wireless Controller']
if len(controller) != 1:
return print('Xbox controller not found')
await b.connect(controller[0])
print('Connected to device')
if __name__ == "__main__":
asyncio.get_event_loop().run_until_complete(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment