Skip to content

Instantly share code, notes, and snippets.

@deepcoder
Created February 10, 2023 01:05
Show Gist options
  • Save deepcoder/3aa75b2368e048c030bc5f79f9870680 to your computer and use it in GitHub Desktop.
Save deepcoder/3aa75b2368e048c030bc5f79f9870680 to your computer and use it in GitHub Desktop.
python3 code to test decoding Aranet4 CO2 sensor advertising packet. TODO : add code to OpenMQTTGateway decoder library
#! /usr/bin/env python3
# aranet4-decode.py
# decode aranet4 co2 monitor BLE advertising packets
# 202302091406
#
# https://github.com/Anrijs/Aranet4-ESP32/blob/main/src/Aranet4.h
# ":" (colon) in C struct - what does it mean? [duplicate]
# https://stackoverflow.com/questions/8564532/colon-in-c-struct-what-does-it-mean
#
# typedef struct AranetManufacturerData {
# uint16_t manufacturer_id;
# uint8_t disconnected : 1,
# __unknown1 : 1,
# calib_state : 2,
# dfu_mode : 1,
# integrations : 1,
# __unknown2 : 2;
# struct {
# uint8_t patch;
# uint8_t minor;
# uint16_t major;
# } version;
# uint8_t hw_rev;
# uint16_t __unknown3;
# AranetData data;
# typedef struct AranetData {
# uint16_t co2 = 0;
# uint16_t temperature = 0;
# uint16_t pressure = 0;
# uint8_t humidity = 0;
# uint8_t battery = 0;
# uint8_t unkn = 0;
# uint16_t interval = 0;
# uint16_t ago = 0;
PROGRAM_NAME = "aranet4-decode"
VERSION_MAJOR = "1"
VERSION_MINOR = "6"
WORKING_DIRECTORY = ""
import sys
import threading
from datetime import datetime, timezone
import time
import json
import paho.mqtt.client as mqtt
# for healthcheck web server
import os
from http.server import HTTPServer, CGIHTTPRequestHandler
# global thread control flag
STOP_THREADS = False
# MAKE SURE this is same value as in Dockerfile. !!!!!
HEALTHCHECK_PORT=8998
# mqtt setup
MQTT_SERVER = "192.168.2.242"
MQTT_TOPIC_BASE = "aranet4"
# manufacture id from advertising packet must match this
SENSOR_MANUFACTURER_ID = 1794
# the routine created a 16 bit integer value from 2 string ascii values
# data bytes are in reverse endian order in bluetooth, compared to intel. so for 16 bit integer, 1st byte is low half, 2nd byte high half
def le16(data: str, start: int = 0) -> int:
l = int(data[start : start + 2], 16)
h = int(data[start + 2 : start + 4], 16)
v = l + (h * 256)
return v
def on_message(mqttc, userdata, message):
msg = json.loads(str(message.payload.decode("utf-8")))
if "id" in msg:
# get time stamp of info retrieved in UTC time
retrieve_time = datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S")
# get bluetooth mac address of sensor
aranet4_mac = msg["id"]
# print(aranet4_mac)
# print("received message: " , msg)
# command line argument contains the full advertising packet as a hex value string
# ap = sys.argv[1]
# get raw advertising packet data
ap = msg["manufacturerdata"]
# get bluetooth signal quality
rssi = msg["rssi"]
# check if device manufacturer id is correct
device_id = le16(ap, 0)
if device_id == SENSOR_MANUFACTURER_ID :
# typedef struct AranetManufacturerData {
# uint16_t manufacturer_id;
# uint8_t disconnected : 1,
# __unknown1 : 1,
# calib_state : 2,
# dfu_mode : 1,
# integrations : 1,
# __unknown2 : 2;
# struct {
# uint8_t patch;
# uint8_t minor;
# uint16_t major;
# } version;
# uint8_t hw_rev;
# uint16_t __unknown3;
device_flags = str(bin(int(ap[4:5], 8))[2:].zfill(8))
software_patch = int(ap[8:9])
software_minor = int(ap[9:10])
software_major = le16(ap, 10)
hardware_version = int(ap[14:15])
# 2nd half of advertising packet is the dynamic sensor values, first half is info on device id, firmware version and hardware version and stuff
sp = ap[20:]
# check if we get long advertising string or short one, if not long then the 'smart home integrations' are probably turned off for device
if len(sp) != 0 :
# print(ap)
# print(sp)
# co2 value
c = le16(sp, 0)
# temperature in either fahrenheit or centigrade
t = round((le16(sp, 4) / 20.0) * 1.8 + 32.0, 1)
# t = le16(sp, 4) / 20.0
# atmospheric pressure
p = int(round(le16(sp, 8) / 10.0, 0))
# humidity in %
h = int(round(le16(sp, 10) / 255, 0))
# battery level in %
b = int(round(le16(sp, 12) / 255, 0))
# interval, not sure what this is
i = int(round(le16(sp, 14), 0))
# this is sensor reading setting, 60, 120 or... seconds between reading, set in app
a = int(round(le16(sp, 18), 0))
# set the subtopic based on mac address of aranet4 bluetooth address
MQTT_TOPIC_UNIT = aranet4_mac
# create json message string
xjs = \
{
"retrieve_time" : retrieve_time, \
"co2" : c, \
"temperature" : t, \
"pressure" : p, \
"humidity" : h, \
"battery" : b, \
"interval" : i, \
"reading_interval" : a, \
"rssi" : rssi \
}
# publish message to mqtt
mqttc.publish(MQTT_TOPIC_BASE + "/" + MQTT_TOPIC_UNIT, json.dumps(xjs), 1)
print(retrieve_time, aranet4_mac, device_flags, software_major, software_minor, software_patch, hardware_version)
# print(retrieve_time, c, "CO2 ppm", t, "deg f", p, "hPa", h, "hum%", b, "batt%", i, "intv", a, "ago", "rssi", rssi)
else :
print(retrieve_time, aranet4_mac, "Smart Home integration may not be enabled, no sensor advertising data", device_flags, software_major, software_minor, software_patch, hardware_version)
# wrong manufacturer
else :
print("Device has wrong manufacturer id : ", device_id)
def healthcheck_webserver():
global STOP_THREADS
global HEALTHCHECK_PORT
# Make sure the server is created at current directory
os.chdir('.')
# Create server object listening the port set at top of program
server_object = HTTPServer(server_address=('0.0.0.0', HEALTHCHECK_PORT), RequestHandlerClass=CGIHTTPRequestHandler)
# Start the web server
server_object.serve_forever()
while not STOP_THREADS:
time.sleep(1)
def main():
global STOP_THREADS
# start simple web server for healthcheck
healthcheck=threading.Thread(target=healthcheck_webserver)
healthcheck.start()
try :
# connect to MQTT server
mqttc = mqtt.Client(PROGRAM_NAME) # Create instance of client with client ID
mqttc.connect(MQTT_SERVER, 1883) # Connect to (broker, port, keepalive-time)
print("Program start : " + PROGRAM_NAME + " Version : " + VERSION_MAJOR + "." + VERSION_MINOR)
# Start mqtt
mqttc.loop_start()
except Exception as e:
print("cannot initialize MQTT connection: " + MQTT_SERVER + " " + str(e))
sys.exit(1)
mqttc.subscribe("home/OpenMQTTGateway/BTtoMQTT/#")
# procedure to execute on MQTT message in topic received
mqttc.on_message = on_message
try :
while 1 == 1 :
time.sleep(30)
except KeyboardInterrupt :
STOP_THREADS = True
mqttc.disconnect()
mqttc.loop_stop()
print("Ctrl-c exiting")
sys.exit(0)
except Exception as e:
print("Unhandled error : " + str(e))
sys.exit(1)
if __name__ == '__main__':
main()
@DigiH
Copy link

DigiH commented Jun 20, 2023

HI @deepcoder,

we stumbled across your gist here, with the comment

TODO : add code to OpenMQTTGateway decoder library

If you feel like collaborating on such a decoder, please open a discussion for Theengs Decoder.

Thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment