Skip to content

Instantly share code, notes, and snippets.

@warhammerkid
Created March 25, 2023 23:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save warhammerkid/bb00ec3bba098587e7157724b79de22a to your computer and use it in GitHub Desktop.
Save warhammerkid/bb00ec3bba098587e7157724b79de22a to your computer and use it in GitHub Desktop.
Example of Custom Bluetti Polling Script
import asyncio
from bleak import BleakError
import sys
from bluetti_mqtt.bluetooth import (check_addresses, BluetoothClient, ParseError, BadConnectionError)
async def monitor_device(address):
devices = await check_addresses({address})
if len(devices) == 0:
sys.exit('Could not find the given device to connect to')
device = devices[0]
print(f'Connecting to {device.address}')
client = BluetoothClient(device.address)
asyncio.get_running_loop().create_task(client.run())
while not client.is_connected:
print('Waiting for connection...')
await asyncio.sleep(1)
continue
# Poll device
while True:
for command in device.polling_commands:
response_future = await client.perform(command)
try:
response = await response_future
parsed = device.parse(command.page, command.offset, response.body)
print(parsed) # <-- This is the thing you want to save
except (BadConnectionError, BleakError, ParseError) as err:
print(f'Got an error running command {command}: {err}')
if __name__ == "__main__":
asyncio.run(monitor_device(sys.argv[1]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment