Skip to content

Instantly share code, notes, and snippets.

@huangyq23
Last active February 2, 2024 18:51
Show Gist options
  • Save huangyq23/8a21c571d0afc8b8440e5ac19f8aeaeb to your computer and use it in GitHub Desktop.
Save huangyq23/8a21c571d0afc8b8440e5ac19f8aeaeb to your computer and use it in GitHub Desktop.
CPT MQTT
# type: ignore
from struct import unpack
import bluetooth
_CPT_COMPANY_ID = b'\x09\xc7'
_CPT_PRODUCT_TYPE = b'\x01'
_CPT_PROBE_STATUS_SERVICE_UUID = bluetooth.UUID("00000100-CAAB-3792-3D44-97AE51C1407A")
_CPT_PROBE_STATUS_UUID = bluetooth.UUID("00000101-CAAB-3792-3D44-97AE51C1407A")
def extract_temp_data(temps, mode):
if (mode == 0):
tp_list = [0]*8
tp_list[7] = ((temps[0] & 0xFF) << 5) | ((temps[1] & 0xF8) >> 3)
tp_list[6] = ((temps[1] & 0x07) << 10) | ((temps[2] & 0xFF) << 2) | ((temps[3] & 0xC0) >> 6)
tp_list[5] = ((temps[3] & 0x3F) << 7) | ((temps[4] & 0xFE) >> 1)
tp_list[4] = ((temps[4] & 0x01) << 12) | ((temps[5] & 0xFF) << 4) | ((temps[6] & 0xF0) >> 4)
tp_list[3] = ((temps[6] & 0x0F) << 9) | ((temps[7] & 0xFF) << 1) | ((temps[8] & 0x80) >> 7)
tp_list[2] = ((temps[8] & 0x7F) << 6) | ((temps[9] & 0xFC) >> 2)
tp_list[1] = ((temps[9] & 0x03) << 11) | ((temps[10] & 0xFF) << 3) | ((temps[11] & 0xE0) >> 5)
tp_list[0] = ((temps[11] & 0x1F) << 8) | ((temps[12] & 0xFF) >> 0)
else:
tp_list = [0]
tp_list[0] = ((temps[11] & 0x1F) << 8) | ((temps[12] & 0xFF) >> 0)
real_temps = [round((tp * 0.05) - 20, 2) for tp in tp_list]
return real_temps
@micropython.asm_thumb
def reverse(r0, r1): # bytearray, len(bytearray)
add(r4, r0, r1)
sub(r4, 1) # end address
label(LOOP)
ldrb(r5, [r0, 0])
ldrb(r6, [r4, 0])
strb(r6, [r0, 0])
strb(r5, [r4, 0])
add(r0, 1)
sub(r4, 1)
cmp(r4, r0)
bpl(LOOP)
def reverse_bytes(input):
data = bytearray(input)
reverse(data, len(data))
return data
def decode_probe_status_data(data):
print(data)
if len(data) == 30:
result = unpack('<LL13sss7s', data)
log_min = result[0]
log_max = result[1]
raw_temp = reverse_bytes(result[2])
model_and_id = result[3]
battery_status_and_virtual_sensors = result[4]
mode = model_and_id[0] & 0x3
print(model_and_id.hex())
real_temps = extract_temp_data(raw_temp, mode)
prediction_status = result[5]
prediction_state = prediction_status[0] & 0xf
prediction_mode = (prediction_status[0] >> 4) & 0x3
prediction_type = (prediction_status[0] >> 6) & 0x3
set_point = (prediction_status[2] & 0x03) << 8 | prediction_status[1]
heat_start = (prediction_status[3] & 0x0f) << 6 | (prediction_status[2] & 0xfc) >> 2
seconds = ((prediction_status[5] & 0x1F) << 12) | (prediction_status[4] << 4) | ((prediction_status[3] & 0xF0) >> 4)
raw_core = (prediction_status[6] << 3) | ((prediction_status[5] & 0xE0) >> 5)
print(real_temps)
print(prediction_state, prediction_mode, prediction_type, set_point, heat_start, seconds, raw_core)
return real_temps
def decode_cpt_data(data):
# print(data)
# print(len(data))
if len(data) == 22:
result = unpack('<s4s13sss2s', data)
product_type = result[0]
if product_type == _CPT_PRODUCT_TYPE:
serial_number = reverse_bytes(result[1])
raw_temp = reverse_bytes(result[2])
model_and_id = result[3]
battery_status_and_virtual_sensors = result[4]
reserved = result[5]
# mode is bit 1-2 from model_and_id
# 0: Normal
# 1: Instant Read
# 2: Rserved
# 3: Error
modes = ['Normal', 'Instant Read', 'Reserved', 'Error']
mode = model_and_id[0] & 0x3
real_temps = extract_temp_data(raw_temp, mode)
# color_id is bit 3-5 from model_and_id
# 0: Yellow
# 1: Grey
# 2-7: TBD
colors = ['Yellow', 'Grey', 'TBD', 'TBD', 'TBD', 'TBD', 'TBD']
color_id = (model_and_id[0] >> 0x2) & 0x7
# print(mode, color_id)
# probe_id is bit 6-8 from model_and_id
probe_id = (model_and_id[0] >> 0x5) & 0x7
# print(probe_id)
# battery_status is bit 1 from battery_status_and_virtual_sensors
# 0: OK
# 1: Low
battery_statues = ['OK', 'Low']
battery_status = battery_status_and_virtual_sensors[0] & 0x1
# print(battery_status)
virtual_sensors = battery_status_and_virtual_sensors[0] >> 0x1
# virtual_core_sensor is bit 2-4 from battery_status_and_virtual_sensors
# 0: T1
# 1: T2
# 2: T3
# 3: T4
# 4: T5
# 5: T6
virtual_core_sensors = ['T1', 'T2', 'T3', 'T4', 'T5', 'T6']
virtual_core_sensor = virtual_sensors & 0x7
# print(virtual_core_sensor)
# virtual_surface_sensor is bit 5-6 from battery_status_and_virtual_sensors
# 0: T4
# 1: T5
# 2: T6
# 3: T7
virtual_surface_sensors = ['T4', 'T5', 'T6', 'T7']
virtual_surface_sensor = (virtual_sensors > 0x3) & 0x3
# print(virtual_surface_sensor)
# virtual_amibent_sensor us bit 7-8 from battery_status_and_virtual_sensors
# 0: T5
# 1: T6
# 2: T7
# 3: T8
virtual_amibent_sensors = ['T5', 'T6', 'T7', 'T8']
virtual_amibent_sensor = (virtual_sensors >> 0x5) & 0x3
# print(virtual_amibent_sensor)
# print(reserved)
result = {
'serial_number': serial_number.hex(),
'mode': modes[mode],
'color': colors[color_id],
'probe_id': probe_id,
'battery_status': battery_statues[battery_status],
'virtual_core_sensor': virtual_core_sensors[virtual_core_sensor],
'virtual_surface_sensor': virtual_surface_sensors[virtual_surface_sensor],
'virtual_amibent_sensor': virtual_amibent_sensors[virtual_amibent_sensor],
'temps': real_temps
}
if mode == 0:
result['vcs_temp'] = real_temps[virtual_core_sensor]
result['vss_temp'] = real_temps[virtual_surface_sensor + 3]
result['vas_temp'] = real_temps[virtual_amibent_sensor + 4]
return result
return None
import aioble
import json
import ssl
from mqtt_as import MQTTClient, config
from cpt import _CPT_COMPANY_ID, decode_cpt_data
import uasyncio as asyncio
import ntptime
from machine import WDT
ntptime.timeout = 10
# Let's Encrypt Authority
ca_certificate_path = "/certs/isrgrootx1.der"
print('Loading CA Certificate')
with open(ca_certificate_path, 'rb') as f:
cacert = f.read()
print('Obtained CA Certificate')
config['server'] = 'mqtt.internal.local'
config['ssid'] = 'WI-FI'
config['wifi_pw'] = 'PASSWORD'
config['user'] = 'MQTT_USER'
config['password'] = 'MQTT_PASSWORD'
config["queue_len"] = 1
# (Optional) Enable SSL/TLS support for the MQTT client
config['ssl'] = True
config['ssl_params'] = {'server_hostname': 'mqtt.internal.local', 'cadata': cacert, 'cert_reqs': ssl.CERT_REQUIRED}
client = MQTTClient(config)
async def up(client): # Respond to connectivity being (re)established
while True:
await client.up.wait() # Wait on an Event
client.up.clear()
async def find_cpt():
async with aioble.scan(1000, interval_us=30000, window_us=30000, active=True) as scanner:
async for result in scanner:
for m, d in result.manufacturer():
if m == 0x09c7:
return decode_cpt_data(d)
return None
wdt = None
async def main(client):
print ("Connecting to WiFi")
await client.wifi_connect()
print ("Updating system time")
ntptime.settime()
print ("Connecting to MQTT")
await client.connect()
print ("Connected")
asyncio.create_task(up(client))
# Watchdog timer
global wdt
wdt = WDT(timeout=5000)
while True:
result = await find_cpt()
if result:
await client.publish(f'cpt/{result["serial_number"]}', json.dumps(result), qos = 1)
wdt.feed()
await asyncio.sleep(1)
try:
asyncio.run(main(client))
finally:
client.close()
@riffnshred
Copy link

Dude, I was looking all over github for something like this. Listened to self, also look at Gist !!!

Thank you for this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment