Example of Custom Bluetti Polling Script
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from bleak import BleakError | |
import sys | |
from bluetti_mqtt.bluetooth import (check_addresses, BluetoothClient, ParseError, BadConnectionError) | |
async def monitor_device(address): | |
devices = await check_addresses({address}) | |
if len(devices) == 0: | |
sys.exit('Could not find the given device to connect to') | |
device = devices[0] | |
print(f'Connecting to {device.address}') | |
client = BluetoothClient(device.address) | |
asyncio.get_running_loop().create_task(client.run()) | |
while not client.is_connected: | |
print('Waiting for connection...') | |
await asyncio.sleep(1) | |
continue | |
# Poll device | |
while True: | |
for command in device.polling_commands: | |
response_future = await client.perform(command) | |
try: | |
response = await response_future | |
parsed = device.parse(command.page, command.offset, response.body) | |
print(parsed) # <-- This is the thing you want to save | |
except (BadConnectionError, BleakError, ParseError) as err: | |
print(f'Got an error running command {command}: {err}') | |
if __name__ == "__main__": | |
asyncio.run(monitor_device(sys.argv[1])) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment