Last active
October 4, 2023 09:55
-
-
Save zetxx/6f141bfcfe553d264a9aa3d608176cfe to your computer and use it in GitHub Desktop.
adax-loca-init
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# docs https://adax.no/wi-fi/api-development/#local | |
# registers heater to your wifi network | |
# install dependencies with: pip install bleak requests | |
import bleak | |
import asyncio | |
import operator | |
import urllib.parse | |
wifi_ssid = 'replace with your wifi name' # replace with your wifi name | |
wifi_psk = 'replace with your wifi psk' # replace with your wifi psk | |
access_token = '12345678' # generate some strong password for your heater | |
UUID_ADAX_BLE_SERVICE = '3885cc10-7c18-4ad4-a48d-bf11abf7cb92' | |
UUID_ADAX_BLE_SERVICE_CHARACTERISTIC_COMMAND = '0000cc11-0000-1000-8000-00805f9b34fb' | |
# ignore registered, non heater devices | |
def is_available_device(manufacturer_data): | |
ADAX_DEVICE_TYPE_HEATER_BLE = 5 | |
if manufacturer_data and len(manufacturer_data) >= 10: | |
type_id = manufacturer_data[0] | |
status_byte = manufacturer_data[1] | |
mac_id = 0 | |
for byte in manufacturer_data[2:10]: | |
mac_id = mac_id * 256 + byte | |
registered = status_byte & (0x1 << 0) | |
managed = status_byte & (0x1 << 1) | |
return mac_id and type_id == ADAX_DEVICE_TYPE_HEATER_BLE and not registered and not managed | |
return False | |
# scan and return available heater | |
async def scan_for_any_available_ble_device(): | |
print("scan_for_any_available_ble_device") | |
discovered = await bleak.discover(timeout = 60) | |
if discovered: | |
for discovered_item in discovered: | |
metadata = discovered_item.metadata | |
print(metadata) | |
uuids = metadata['uuids'] if 'uuids' in metadata else None | |
if uuids and UUID_ADAX_BLE_SERVICE in uuids: | |
manufacturer_data_list = None | |
manufacturer_data = metadata['manufacturer_data'] if 'manufacturer_data' in metadata else None | |
if manufacturer_data: | |
first_bytes = next(iter(manufacturer_data)) | |
if first_bytes: | |
other_bytes = manufacturer_data[first_bytes] | |
manufacturer_data_list = [first_bytes % 256, operator.floordiv(first_bytes, 256)] + list(other_bytes) | |
if is_available_device(manufacturer_data_list): | |
return discovered_item.address | |
return None | |
# handle notifications for heater | |
def command_notification_handler(uuid, data): | |
BLE_COMMAND_STATUS_OK = 0 | |
BLE_COMMAND_STATUS_INVALID_WIFI = 1 | |
byte_list = None if not data else list(data) | |
status = None if not byte_list else byte_list[0] | |
if status == BLE_COMMAND_STATUS_OK: | |
ip = None if not byte_list or len(byte_list) < 5 else '%d.%d.%d.%d' % (byte_list[1], byte_list[2], byte_list[3], byte_list[4]) | |
print('Heater Registered, use client_control_demo with IP: %s and token: %s' % (ip, access_token)) | |
elif status == BLE_COMMAND_STATUS_INVALID_WIFI: | |
print('Invalid WiFi crendentials') | |
# send command to heater | |
async def write_command(command_byte_list, client): | |
MAX_BYTES_IN_COMMAND_CHUNK = 17 | |
byte_count = len(command_byte_list) | |
chunk_count = operator.floordiv(byte_count, MAX_BYTES_IN_COMMAND_CHUNK) | |
if chunk_count * MAX_BYTES_IN_COMMAND_CHUNK < byte_count: | |
chunk_count += 1 | |
sent_byte_count = 0 | |
chunk_nr = 0 | |
while chunk_nr < chunk_count: | |
is_last = chunk_nr == (chunk_count - 1) | |
chunk_data_length = byte_count - sent_byte_count if is_last else MAX_BYTES_IN_COMMAND_CHUNK | |
chunk = [chunk_nr, 1 if is_last else 0] + command_byte_list[sent_byte_count:(sent_byte_count + chunk_data_length)] | |
await client.write_gatt_char(UUID_ADAX_BLE_SERVICE_CHARACTERISTIC_COMMAND, bytearray(chunk)) | |
sent_byte_count += chunk_data_length | |
chunk_nr += 1 | |
# register heater | |
async def register_devices(loop): | |
print('Press and hold OK button of the heater until the blue led starts blinking') | |
device = await scan_for_any_available_ble_device() | |
if device: | |
client = bleak.BleakClient(device, loop = loop) | |
await client.connect() | |
await client.start_notify(UUID_ADAX_BLE_SERVICE_CHARACTERISTIC_COMMAND, command_notification_handler) | |
ssid_encoded = urllib.parse.quote(wifi_ssid) | |
psk_encoded = urllib.parse.quote(wifi_psk) | |
access_token_encoded = urllib.parse.quote(access_token) | |
byte_list = list(bytearray('command=join&ssid=' + ssid_encoded + '&psk=' + psk_encoded + '&token=' + access_token_encoded, 'ascii')) | |
await write_command(byte_list, client) | |
loop = asyncio.get_event_loop() | |
loop.create_task(register_devices(loop)) | |
loop.run_forever() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment