Skip to content

Instantly share code, notes, and snippets.

@zetxx
Last active October 4, 2023 09:55
Show Gist options
  • Save zetxx/6f141bfcfe553d264a9aa3d608176cfe to your computer and use it in GitHub Desktop.
Save zetxx/6f141bfcfe553d264a9aa3d608176cfe to your computer and use it in GitHub Desktop.
adax-loca-init
# docs https://adax.no/wi-fi/api-development/#local
# registers heater to your wifi network
# install dependencies with: pip install bleak requests
import bleak
import asyncio
import operator
import urllib.parse
wifi_ssid = 'replace with your wifi name' # replace with your wifi name
wifi_psk = 'replace with your wifi psk' # replace with your wifi psk
access_token = '12345678' # generate some strong password for your heater
UUID_ADAX_BLE_SERVICE = '3885cc10-7c18-4ad4-a48d-bf11abf7cb92'
UUID_ADAX_BLE_SERVICE_CHARACTERISTIC_COMMAND = '0000cc11-0000-1000-8000-00805f9b34fb'
# ignore registered, non heater devices
def is_available_device(manufacturer_data):
ADAX_DEVICE_TYPE_HEATER_BLE = 5
if manufacturer_data and len(manufacturer_data) >= 10:
type_id = manufacturer_data[0]
status_byte = manufacturer_data[1]
mac_id = 0
for byte in manufacturer_data[2:10]:
mac_id = mac_id * 256 + byte
registered = status_byte & (0x1 << 0)
managed = status_byte & (0x1 << 1)
return mac_id and type_id == ADAX_DEVICE_TYPE_HEATER_BLE and not registered and not managed
return False
# scan and return available heater
async def scan_for_any_available_ble_device():
print("scan_for_any_available_ble_device")
discovered = await bleak.discover(timeout = 60)
if discovered:
for discovered_item in discovered:
metadata = discovered_item.metadata
print(metadata)
uuids = metadata['uuids'] if 'uuids' in metadata else None
if uuids and UUID_ADAX_BLE_SERVICE in uuids:
manufacturer_data_list = None
manufacturer_data = metadata['manufacturer_data'] if 'manufacturer_data' in metadata else None
if manufacturer_data:
first_bytes = next(iter(manufacturer_data))
if first_bytes:
other_bytes = manufacturer_data[first_bytes]
manufacturer_data_list = [first_bytes % 256, operator.floordiv(first_bytes, 256)] + list(other_bytes)
if is_available_device(manufacturer_data_list):
return discovered_item.address
return None
# handle notifications for heater
def command_notification_handler(uuid, data):
BLE_COMMAND_STATUS_OK = 0
BLE_COMMAND_STATUS_INVALID_WIFI = 1
byte_list = None if not data else list(data)
status = None if not byte_list else byte_list[0]
if status == BLE_COMMAND_STATUS_OK:
ip = None if not byte_list or len(byte_list) < 5 else '%d.%d.%d.%d' % (byte_list[1], byte_list[2], byte_list[3], byte_list[4])
print('Heater Registered, use client_control_demo with IP: %s and token: %s' % (ip, access_token))
elif status == BLE_COMMAND_STATUS_INVALID_WIFI:
print('Invalid WiFi crendentials')
# send command to heater
async def write_command(command_byte_list, client):
MAX_BYTES_IN_COMMAND_CHUNK = 17
byte_count = len(command_byte_list)
chunk_count = operator.floordiv(byte_count, MAX_BYTES_IN_COMMAND_CHUNK)
if chunk_count * MAX_BYTES_IN_COMMAND_CHUNK < byte_count:
chunk_count += 1
sent_byte_count = 0
chunk_nr = 0
while chunk_nr < chunk_count:
is_last = chunk_nr == (chunk_count - 1)
chunk_data_length = byte_count - sent_byte_count if is_last else MAX_BYTES_IN_COMMAND_CHUNK
chunk = [chunk_nr, 1 if is_last else 0] + command_byte_list[sent_byte_count:(sent_byte_count + chunk_data_length)]
await client.write_gatt_char(UUID_ADAX_BLE_SERVICE_CHARACTERISTIC_COMMAND, bytearray(chunk))
sent_byte_count += chunk_data_length
chunk_nr += 1
# register heater
async def register_devices(loop):
print('Press and hold OK button of the heater until the blue led starts blinking')
device = await scan_for_any_available_ble_device()
if device:
client = bleak.BleakClient(device, loop = loop)
await client.connect()
await client.start_notify(UUID_ADAX_BLE_SERVICE_CHARACTERISTIC_COMMAND, command_notification_handler)
ssid_encoded = urllib.parse.quote(wifi_ssid)
psk_encoded = urllib.parse.quote(wifi_psk)
access_token_encoded = urllib.parse.quote(access_token)
byte_list = list(bytearray('command=join&ssid=' + ssid_encoded + '&psk=' + psk_encoded + '&token=' + access_token_encoded, 'ascii'))
await write_command(byte_list, client)
loop = asyncio.get_event_loop()
loop.create_task(register_devices(loop))
loop.run_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment