Skip to content

Instantly share code, notes, and snippets.

@osteele
Last active July 13, 2022 04:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save osteele/926b5229eb51c090fa65597f833da379 to your computer and use it in GitHub Desktop.
Save osteele/926b5229eb51c090fa65597f833da379 to your computer and use it in GitHub Desktop.
List connected BLE devices
import asyncio
import sys
from collections import Counter, defaultdict
from io import StringIO
import asyncclick as click
from bleak import BleakClient, BleakError, BleakScanner
print_device_names = False
@click.command()
@click.option("--alphabetical", default=False, is_flag=True, help="Sort by name")
@click.option("--name-filter")
async def main(name_filter, alphabetical=False):
devices = sorted(await BleakScanner.discover(), key=lambda d: d.name)
if print_device_names:
names = Counter(d.name for d in devices)
print(
"Device names:",
", ".join(
f"{name} (x{count})" if count > 1 else name
for name, count in names.items()
),
)
if name_filter:
devices = [d for d in devices if name_filter in d.name]
if not devices:
print("Device not found")
sys.exit(1)
errors = defaultdict(list)
sep = ""
return_when = asyncio.ALL_COMPLETED if alphabetical else asyncio.FIRST_COMPLETED
pending = [asyncio.create_task(get_device_services(device)) for device in devices]
while pending:
done, pending = await asyncio.wait(pending, return_when=return_when)
for task in done:
result = await task
device = result["device"]
if "error" in result:
errors[result["error"]].append(result)
else:
print(f"{sep}Device: {device.name} ({device.address}):")
print("Services:")
for svc in result["services"]:
print(f" {name_or_description(svc)}:")
print(
"\n".join(
f" {name_or_description(char)}"
for char in svc.characteristics
)
)
sep = "\n"
for error, results in errors.items():
devices = [result["device"] for result in results]
print(f"{error}:")
print(
" ",
", ".join(
device.address
if device.name == "Unknown"
else f"{device.name} ({device.address})"
for device in devices
),
)
async def get_device_services(device):
output = StringIO()
try:
async with BleakClient(device.address) as client:
svcs = await client.get_services()
return dict(device=device, services=svcs, output=output.getvalue())
except BleakError:
return dict(device=device, error="Disconnected")
except (asyncio.exceptions.TimeoutError, asyncio.exceptions.TimeoutError) as ex:
return dict(device=device, error="Cancelled")
def name_or_description(char):
return char.uuid if char.description == "Unknown" else char.description
if __name__ == "__main__":
main()
anyio==3.5.0
asyncclick==8.0.3.2
asyncio==3.4.3
bleak==0.14.2
pyobjc==8.4.1
# dev
black==22.3.0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment