Skip to content

Instantly share code, notes, and snippets.

@warhammerkid
Last active December 14, 2022 12:21
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save warhammerkid/4db0298120a6a280112e4cc333c2ae45 to your computer and use it in GitHub Desktop.
Save warhammerkid/4db0298120a6a280112e4cc333c2ae45 to your computer and use it in GitHub Desktop.
Bluetti Bluetooth Logger

This script requires python3.7 or greater.

pip install bleak
pip install crcmod

python3 research_logger.py --scan
python3 research_logger.py --log a-log-file.log 00:11:22:33:44:55
import argparse
import asyncio
import base64
import itertools
import json
import re
import struct
import textwrap
import time
import crcmod.predefined
from bleak import BleakClient, BleakScanner, BleakError
modbus_crc = crcmod.predefined.mkCrcFun('modbus')
class DeviceCommand:
def __init__(self, cmd: bytes):
self.cmd = cmd
"""Returns the expected response size in bytes"""
def response_size(self) -> int:
pass
"""Provide an iter implemention so that bytes(cmd) works"""
def __iter__(self):
return iter(self.cmd)
class QueryRangeCommand(DeviceCommand):
def __init__(self, page: int, offset: int, length: int):
self.page = page
self.offset = offset
self.length = length
cmd = bytearray(8)
cmd[0] = 1 # Standard prefix
cmd[1] = 3 # Range query command
struct.pack_into('!BBH', cmd, 2, page, offset, length)
struct.pack_into('<H', cmd, -2, modbus_crc(cmd[:-2]))
super().__init__(cmd)
def response_size(self):
# 3 byte header
# each returned field is actually 2 bytes
# 2 byte crc
return 2 * self.length + 5
def __repr__(self):
return f'QueryRangeCommand(page={self.page:#04x}, offset={self.offset:#04x}, length={self.length:#04x})'
class ParseError(Exception):
pass
# Triggers a re-connect
class BadConnectionError(Exception):
pass
class BluetoothPowerStation:
RESPONSE_TIMEOUT = 5
WRITE_UUID = '0000ff02-0000-1000-8000-00805f9b34fb'
NOTIFY_UUID = '0000ff01-0000-1000-8000-00805f9b34fb'
current_command: DeviceCommand
notify_future: asyncio.Future
notify_data: bytearray
def __init__(self, address: str):
self.address = address
self.client = BleakClient(address)
self.command_queue = asyncio.Queue()
self.notify_future = None
self.loop = asyncio.get_running_loop()
@property
def is_connected(self):
return self.client.is_connected
async def perform(self, cmd: DeviceCommand):
future = self.loop.create_future()
await self.command_queue.put((cmd, future))
return future
async def perform_nowait(self, cmd: DeviceCommand):
await self.command_queue.put((cmd, None))
async def run(self):
while True:
try:
await self.client.connect()
await self.client.start_notify(self.NOTIFY_UUID, self._notification_handler)
await self._perform_commands(self.client)
except (BleakError, asyncio.TimeoutError):
continue
except BadConnectionError as err:
# Something went wrong somewhere
await self.client.disconnect()
await asyncio.sleep(1)
finally:
await self.client.disconnect()
async def _perform_commands(self, client):
while client.is_connected:
cmd, cmd_future = await self.command_queue.get()
retries = 0
while retries < 5:
try:
# Prepare to make request
self.current_command = cmd
self.notify_future = self.loop.create_future()
self.notify_data = bytearray()
# Make request
await client.write_gatt_char(self.WRITE_UUID, self.current_command, True)
# Wait for response
res = await asyncio.wait_for(self.notify_future, timeout=self.RESPONSE_TIMEOUT)
if cmd_future:
# TODO: Parse result
cmd_future.set_result(res)
# Success!
break
except ParseError:
# For safety, wait the full timeout before retrying again
retries += 1
await asyncio.sleep(self.RESPONSE_TIMEOUT)
except asyncio.TimeoutError:
retries += 1
except BleakError as err:
if cmd_future:
cmd_future.set_exception(err)
# Don't retry
break
except BadConnectionError as err:
# Exit command loop
if cmd_future:
cmd_future.set_exception(err)
self.command_queue.task_done()
raise
if retries == 5:
err = BadConnectionError('too many retries')
if cmd_future:
cmd_future.set_exception(err)
self.command_queue.task_done()
raise err
else:
self.command_queue.task_done()
def _notification_handler(self, _sender: int, data: bytearray):
# Ignore notifications we don't expect
if not self.notify_future or self.notify_future.done():
return
# If something went wrong, we might get weird data.
if data == b'AT+NAME?\r' or data == b'AT+ADV?\r':
self.notify_future.set_exception(BadConnectionError('Got AT+ notification'))
return
# Save data
self.notify_data.extend(data)
# Check if we're done reading the data we expected
if len(self.notify_data) == self.current_command.response_size():
# Validate the CRC
crc = modbus_crc(self.notify_data[0:-2]).to_bytes(2, byteorder='little')
if self.notify_data[-2:] == crc:
self.notify_future.set_result(self.notify_data)
else:
self.notify_future.set_exception(ParseError('Failed checksum'))
async def scan():
print('Scanning....')
devices = await BleakScanner.discover()
if len(devices) == 0:
print('0 devices found - this is a likely sign that something went wrong')
else:
prefix = re.compile('^(AC200M|AC300|EP500P|EP500)\d+$')
bluetti_devices = [d for d in devices if prefix.match(d.name)]
for d in bluetti_devices:
print(f'Found {d.name}: address {d.address}')
def log_packet(output, data, command):
log_entry = {
'type': 'client',
'time': time.strftime('%Y-%m-%d %H:%M:%S %z', time.localtime()),
'data': base64.b64encode(data).decode('ascii'),
'command': base64.b64encode(bytes(command)).decode('ascii'),
}
output.write(json.dumps(log_entry) + '\n')
async def log(address, path):
print(f'Connecting to {address}...')
device = BluetoothPowerStation(address)
asyncio.get_running_loop().create_task(device.run())
commands = [
QueryRangeCommand(0x00, 0x00, 0x46),
QueryRangeCommand(0x00, 0x46, 0x42),
QueryRangeCommand(0x00, 0x88, 0x4a),
QueryRangeCommand(0x0B, 0xB9, 0x3D)
]
with open(path, 'a') as log_file:
for command in itertools.cycle(commands):
if not device.is_connected:
await asyncio.sleep(1)
continue
result_future = await device.perform(command)
try:
result = await result_future
log_packet(log_file, result, command)
except ParseError:
print('Got a parse exception...')
except BadConnectionError as err:
print(f'Needed to disconnect due to error: {err}')
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description='Scans for Bluetti devices and logs information',
epilog=textwrap.dedent("""\
To use, run the scanner first:
%(prog)s --scan
Once you have found your device you can run the logger:
%(prog)s --log log-file.log 00:11:22:33:44:55
"""))
parser.add_argument(
'--scan',
action='store_true',
help='Scans for devices and prints out addresses')
parser.add_argument(
'--log',
metavar='PATH',
help='Connect and log data for the device to the given file')
parser.add_argument(
'address',
metavar='ADDRESS',
nargs='?',
help='The device MAC to connect to for logging')
args = parser.parse_args()
if args.scan:
asyncio.run(scan())
elif args.log:
asyncio.run(log(args.address, args.log))
else:
parser.print_help()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment