- Written in micropython
- MQTT Library: https://github.com/peterhinch/micropython-mqtt/blob/master/mqtt_as/README.md
License: ISC
License: ISC
# type: ignore | |
from struct import unpack | |
import bluetooth | |
_CPT_COMPANY_ID = b'\x09\xc7' | |
_CPT_PRODUCT_TYPE = b'\x01' | |
_CPT_PROBE_STATUS_SERVICE_UUID = bluetooth.UUID("00000100-CAAB-3792-3D44-97AE51C1407A") | |
_CPT_PROBE_STATUS_UUID = bluetooth.UUID("00000101-CAAB-3792-3D44-97AE51C1407A") | |
def extract_temp_data(temps, mode): | |
if (mode == 0): | |
tp_list = [0]*8 | |
tp_list[7] = ((temps[0] & 0xFF) << 5) | ((temps[1] & 0xF8) >> 3) | |
tp_list[6] = ((temps[1] & 0x07) << 10) | ((temps[2] & 0xFF) << 2) | ((temps[3] & 0xC0) >> 6) | |
tp_list[5] = ((temps[3] & 0x3F) << 7) | ((temps[4] & 0xFE) >> 1) | |
tp_list[4] = ((temps[4] & 0x01) << 12) | ((temps[5] & 0xFF) << 4) | ((temps[6] & 0xF0) >> 4) | |
tp_list[3] = ((temps[6] & 0x0F) << 9) | ((temps[7] & 0xFF) << 1) | ((temps[8] & 0x80) >> 7) | |
tp_list[2] = ((temps[8] & 0x7F) << 6) | ((temps[9] & 0xFC) >> 2) | |
tp_list[1] = ((temps[9] & 0x03) << 11) | ((temps[10] & 0xFF) << 3) | ((temps[11] & 0xE0) >> 5) | |
tp_list[0] = ((temps[11] & 0x1F) << 8) | ((temps[12] & 0xFF) >> 0) | |
else: | |
tp_list = [0] | |
tp_list[0] = ((temps[11] & 0x1F) << 8) | ((temps[12] & 0xFF) >> 0) | |
real_temps = [round((tp * 0.05) - 20, 2) for tp in tp_list] | |
return real_temps | |
@micropython.asm_thumb | |
def reverse(r0, r1): # bytearray, len(bytearray) | |
add(r4, r0, r1) | |
sub(r4, 1) # end address | |
label(LOOP) | |
ldrb(r5, [r0, 0]) | |
ldrb(r6, [r4, 0]) | |
strb(r6, [r0, 0]) | |
strb(r5, [r4, 0]) | |
add(r0, 1) | |
sub(r4, 1) | |
cmp(r4, r0) | |
bpl(LOOP) | |
def reverse_bytes(input): | |
data = bytearray(input) | |
reverse(data, len(data)) | |
return data | |
def decode_probe_status_data(data): | |
print(data) | |
if len(data) == 30: | |
result = unpack('<LL13sss7s', data) | |
log_min = result[0] | |
log_max = result[1] | |
raw_temp = reverse_bytes(result[2]) | |
model_and_id = result[3] | |
battery_status_and_virtual_sensors = result[4] | |
mode = model_and_id[0] & 0x3 | |
print(model_and_id.hex()) | |
real_temps = extract_temp_data(raw_temp, mode) | |
prediction_status = result[5] | |
prediction_state = prediction_status[0] & 0xf | |
prediction_mode = (prediction_status[0] >> 4) & 0x3 | |
prediction_type = (prediction_status[0] >> 6) & 0x3 | |
set_point = (prediction_status[2] & 0x03) << 8 | prediction_status[1] | |
heat_start = (prediction_status[3] & 0x0f) << 6 | (prediction_status[2] & 0xfc) >> 2 | |
seconds = ((prediction_status[5] & 0x1F) << 12) | (prediction_status[4] << 4) | ((prediction_status[3] & 0xF0) >> 4) | |
raw_core = (prediction_status[6] << 3) | ((prediction_status[5] & 0xE0) >> 5) | |
print(real_temps) | |
print(prediction_state, prediction_mode, prediction_type, set_point, heat_start, seconds, raw_core) | |
return real_temps | |
def decode_cpt_data(data): | |
# print(data) | |
# print(len(data)) | |
if len(data) == 22: | |
result = unpack('<s4s13sss2s', data) | |
product_type = result[0] | |
if product_type == _CPT_PRODUCT_TYPE: | |
serial_number = reverse_bytes(result[1]) | |
raw_temp = reverse_bytes(result[2]) | |
model_and_id = result[3] | |
battery_status_and_virtual_sensors = result[4] | |
reserved = result[5] | |
# mode is bit 1-2 from model_and_id | |
# 0: Normal | |
# 1: Instant Read | |
# 2: Rserved | |
# 3: Error | |
modes = ['Normal', 'Instant Read', 'Reserved', 'Error'] | |
mode = model_and_id[0] & 0x3 | |
real_temps = extract_temp_data(raw_temp, mode) | |
# color_id is bit 3-5 from model_and_id | |
# 0: Yellow | |
# 1: Grey | |
# 2-7: TBD | |
colors = ['Yellow', 'Grey', 'TBD', 'TBD', 'TBD', 'TBD', 'TBD'] | |
color_id = (model_and_id[0] >> 0x2) & 0x7 | |
# print(mode, color_id) | |
# probe_id is bit 6-8 from model_and_id | |
probe_id = (model_and_id[0] >> 0x5) & 0x7 | |
# print(probe_id) | |
# battery_status is bit 1 from battery_status_and_virtual_sensors | |
# 0: OK | |
# 1: Low | |
battery_statues = ['OK', 'Low'] | |
battery_status = battery_status_and_virtual_sensors[0] & 0x1 | |
# print(battery_status) | |
virtual_sensors = battery_status_and_virtual_sensors[0] >> 0x1 | |
# virtual_core_sensor is bit 2-4 from battery_status_and_virtual_sensors | |
# 0: T1 | |
# 1: T2 | |
# 2: T3 | |
# 3: T4 | |
# 4: T5 | |
# 5: T6 | |
virtual_core_sensors = ['T1', 'T2', 'T3', 'T4', 'T5', 'T6'] | |
virtual_core_sensor = virtual_sensors & 0x7 | |
# print(virtual_core_sensor) | |
# virtual_surface_sensor is bit 5-6 from battery_status_and_virtual_sensors | |
# 0: T4 | |
# 1: T5 | |
# 2: T6 | |
# 3: T7 | |
virtual_surface_sensors = ['T4', 'T5', 'T6', 'T7'] | |
virtual_surface_sensor = (virtual_sensors > 0x3) & 0x3 | |
# print(virtual_surface_sensor) | |
# virtual_amibent_sensor us bit 7-8 from battery_status_and_virtual_sensors | |
# 0: T5 | |
# 1: T6 | |
# 2: T7 | |
# 3: T8 | |
virtual_amibent_sensors = ['T5', 'T6', 'T7', 'T8'] | |
virtual_amibent_sensor = (virtual_sensors >> 0x5) & 0x3 | |
# print(virtual_amibent_sensor) | |
# print(reserved) | |
result = { | |
'serial_number': serial_number.hex(), | |
'mode': modes[mode], | |
'color': colors[color_id], | |
'probe_id': probe_id, | |
'battery_status': battery_statues[battery_status], | |
'virtual_core_sensor': virtual_core_sensors[virtual_core_sensor], | |
'virtual_surface_sensor': virtual_surface_sensors[virtual_surface_sensor], | |
'virtual_amibent_sensor': virtual_amibent_sensors[virtual_amibent_sensor], | |
'temps': real_temps | |
} | |
if mode == 0: | |
result['vcs_temp'] = real_temps[virtual_core_sensor] | |
result['vss_temp'] = real_temps[virtual_surface_sensor + 3] | |
result['vas_temp'] = real_temps[virtual_amibent_sensor + 4] | |
return result | |
return None | |
import aioble | |
import json | |
import ssl | |
from mqtt_as import MQTTClient, config | |
from cpt import _CPT_COMPANY_ID, decode_cpt_data | |
import uasyncio as asyncio | |
import ntptime | |
from machine import WDT | |
ntptime.timeout = 10 | |
# Let's Encrypt Authority | |
ca_certificate_path = "/certs/isrgrootx1.der" | |
print('Loading CA Certificate') | |
with open(ca_certificate_path, 'rb') as f: | |
cacert = f.read() | |
print('Obtained CA Certificate') | |
config['server'] = 'mqtt.internal.local' | |
config['ssid'] = 'WI-FI' | |
config['wifi_pw'] = 'PASSWORD' | |
config['user'] = 'MQTT_USER' | |
config['password'] = 'MQTT_PASSWORD' | |
config["queue_len"] = 1 | |
# (Optional) Enable SSL/TLS support for the MQTT client | |
config['ssl'] = True | |
config['ssl_params'] = {'server_hostname': 'mqtt.internal.local', 'cadata': cacert, 'cert_reqs': ssl.CERT_REQUIRED} | |
client = MQTTClient(config) | |
async def up(client): # Respond to connectivity being (re)established | |
while True: | |
await client.up.wait() # Wait on an Event | |
client.up.clear() | |
async def find_cpt(): | |
async with aioble.scan(1000, interval_us=30000, window_us=30000, active=True) as scanner: | |
async for result in scanner: | |
for m, d in result.manufacturer(): | |
if m == 0x09c7: | |
return decode_cpt_data(d) | |
return None | |
wdt = None | |
async def main(client): | |
print ("Connecting to WiFi") | |
await client.wifi_connect() | |
print ("Updating system time") | |
ntptime.settime() | |
print ("Connecting to MQTT") | |
await client.connect() | |
print ("Connected") | |
asyncio.create_task(up(client)) | |
# Watchdog timer | |
global wdt | |
wdt = WDT(timeout=5000) | |
while True: | |
result = await find_cpt() | |
if result: | |
await client.publish(f'cpt/{result["serial_number"]}', json.dumps(result), qos = 1) | |
wdt.feed() | |
await asyncio.sleep(1) | |
try: | |
asyncio.run(main(client)) | |
finally: | |
client.close() |
Dude, I was looking all over github for something like this. Listened to self, also look at Gist !!!
Thank you for this.