This script requires python3.7 or greater.
pip install bleak
pip install crcmod
python3 research_logger.py --scan
python3 research_logger.py --log a-log-file.log 00:11:22:33:44:55
import argparse | |
import asyncio | |
import base64 | |
import itertools | |
import json | |
import re | |
import struct | |
import textwrap | |
import time | |
import crcmod.predefined | |
from bleak import BleakClient, BleakScanner, BleakError | |
modbus_crc = crcmod.predefined.mkCrcFun('modbus') | |
class DeviceCommand: | |
def __init__(self, cmd: bytes): | |
self.cmd = cmd | |
"""Returns the expected response size in bytes""" | |
def response_size(self) -> int: | |
pass | |
"""Provide an iter implemention so that bytes(cmd) works""" | |
def __iter__(self): | |
return iter(self.cmd) | |
class QueryRangeCommand(DeviceCommand): | |
def __init__(self, page: int, offset: int, length: int): | |
self.page = page | |
self.offset = offset | |
self.length = length | |
cmd = bytearray(8) | |
cmd[0] = 1 # Standard prefix | |
cmd[1] = 3 # Range query command | |
struct.pack_into('!BBH', cmd, 2, page, offset, length) | |
struct.pack_into('<H', cmd, -2, modbus_crc(cmd[:-2])) | |
super().__init__(cmd) | |
def response_size(self): | |
# 3 byte header | |
# each returned field is actually 2 bytes | |
# 2 byte crc | |
return 2 * self.length + 5 | |
def __repr__(self): | |
return f'QueryRangeCommand(page={self.page:#04x}, offset={self.offset:#04x}, length={self.length:#04x})' | |
class ParseError(Exception): | |
pass | |
# Triggers a re-connect | |
class BadConnectionError(Exception): | |
pass | |
class BluetoothPowerStation: | |
RESPONSE_TIMEOUT = 5 | |
WRITE_UUID = '0000ff02-0000-1000-8000-00805f9b34fb' | |
NOTIFY_UUID = '0000ff01-0000-1000-8000-00805f9b34fb' | |
current_command: DeviceCommand | |
notify_future: asyncio.Future | |
notify_data: bytearray | |
def __init__(self, address: str): | |
self.address = address | |
self.client = BleakClient(address) | |
self.command_queue = asyncio.Queue() | |
self.notify_future = None | |
self.loop = asyncio.get_running_loop() | |
@property | |
def is_connected(self): | |
return self.client.is_connected | |
async def perform(self, cmd: DeviceCommand): | |
future = self.loop.create_future() | |
await self.command_queue.put((cmd, future)) | |
return future | |
async def perform_nowait(self, cmd: DeviceCommand): | |
await self.command_queue.put((cmd, None)) | |
async def run(self): | |
while True: | |
try: | |
await self.client.connect() | |
await self.client.start_notify(self.NOTIFY_UUID, self._notification_handler) | |
await self._perform_commands(self.client) | |
except (BleakError, asyncio.TimeoutError): | |
continue | |
except BadConnectionError as err: | |
# Something went wrong somewhere | |
await self.client.disconnect() | |
await asyncio.sleep(1) | |
finally: | |
await self.client.disconnect() | |
async def _perform_commands(self, client): | |
while client.is_connected: | |
cmd, cmd_future = await self.command_queue.get() | |
retries = 0 | |
while retries < 5: | |
try: | |
# Prepare to make request | |
self.current_command = cmd | |
self.notify_future = self.loop.create_future() | |
self.notify_data = bytearray() | |
# Make request | |
await client.write_gatt_char(self.WRITE_UUID, self.current_command, True) | |
# Wait for response | |
res = await asyncio.wait_for(self.notify_future, timeout=self.RESPONSE_TIMEOUT) | |
if cmd_future: | |
# TODO: Parse result | |
cmd_future.set_result(res) | |
# Success! | |
break | |
except ParseError: | |
# For safety, wait the full timeout before retrying again | |
retries += 1 | |
await asyncio.sleep(self.RESPONSE_TIMEOUT) | |
except asyncio.TimeoutError: | |
retries += 1 | |
except BleakError as err: | |
if cmd_future: | |
cmd_future.set_exception(err) | |
# Don't retry | |
break | |
except BadConnectionError as err: | |
# Exit command loop | |
if cmd_future: | |
cmd_future.set_exception(err) | |
self.command_queue.task_done() | |
raise | |
if retries == 5: | |
err = BadConnectionError('too many retries') | |
if cmd_future: | |
cmd_future.set_exception(err) | |
self.command_queue.task_done() | |
raise err | |
else: | |
self.command_queue.task_done() | |
def _notification_handler(self, _sender: int, data: bytearray): | |
# Ignore notifications we don't expect | |
if not self.notify_future or self.notify_future.done(): | |
return | |
# If something went wrong, we might get weird data. | |
if data == b'AT+NAME?\r' or data == b'AT+ADV?\r': | |
self.notify_future.set_exception(BadConnectionError('Got AT+ notification')) | |
return | |
# Save data | |
self.notify_data.extend(data) | |
# Check if we're done reading the data we expected | |
if len(self.notify_data) == self.current_command.response_size(): | |
# Validate the CRC | |
crc = modbus_crc(self.notify_data[0:-2]).to_bytes(2, byteorder='little') | |
if self.notify_data[-2:] == crc: | |
self.notify_future.set_result(self.notify_data) | |
else: | |
self.notify_future.set_exception(ParseError('Failed checksum')) | |
async def scan(): | |
print('Scanning....') | |
devices = await BleakScanner.discover() | |
if len(devices) == 0: | |
print('0 devices found - this is a likely sign that something went wrong') | |
else: | |
prefix = re.compile('^(AC200M|AC300|EP500P|EP500)\d+$') | |
bluetti_devices = [d for d in devices if prefix.match(d.name)] | |
for d in bluetti_devices: | |
print(f'Found {d.name}: address {d.address}') | |
def log_packet(output, data, command): | |
log_entry = { | |
'type': 'client', | |
'time': time.strftime('%Y-%m-%d %H:%M:%S %z', time.localtime()), | |
'data': base64.b64encode(data).decode('ascii'), | |
'command': base64.b64encode(bytes(command)).decode('ascii'), | |
} | |
output.write(json.dumps(log_entry) + '\n') | |
async def log(address, path): | |
print(f'Connecting to {address}...') | |
device = BluetoothPowerStation(address) | |
asyncio.get_running_loop().create_task(device.run()) | |
commands = [ | |
QueryRangeCommand(0x00, 0x00, 0x46), | |
QueryRangeCommand(0x00, 0x46, 0x42), | |
QueryRangeCommand(0x00, 0x88, 0x4a), | |
QueryRangeCommand(0x0B, 0xB9, 0x3D) | |
] | |
with open(path, 'a') as log_file: | |
for command in itertools.cycle(commands): | |
if not device.is_connected: | |
await asyncio.sleep(1) | |
continue | |
result_future = await device.perform(command) | |
try: | |
result = await result_future | |
log_packet(log_file, result, command) | |
except ParseError: | |
print('Got a parse exception...') | |
except BadConnectionError as err: | |
print(f'Needed to disconnect due to error: {err}') | |
parser = argparse.ArgumentParser( | |
formatter_class=argparse.RawDescriptionHelpFormatter, | |
description='Scans for Bluetti devices and logs information', | |
epilog=textwrap.dedent("""\ | |
To use, run the scanner first: | |
%(prog)s --scan | |
Once you have found your device you can run the logger: | |
%(prog)s --log log-file.log 00:11:22:33:44:55 | |
""")) | |
parser.add_argument( | |
'--scan', | |
action='store_true', | |
help='Scans for devices and prints out addresses') | |
parser.add_argument( | |
'--log', | |
metavar='PATH', | |
help='Connect and log data for the device to the given file') | |
parser.add_argument( | |
'address', | |
metavar='ADDRESS', | |
nargs='?', | |
help='The device MAC to connect to for logging') | |
args = parser.parse_args() | |
if args.scan: | |
asyncio.run(scan()) | |
elif args.log: | |
asyncio.run(log(args.address, args.log)) | |
else: | |
parser.print_help() |