Skip to content

Instantly share code, notes, and snippets.

@Romern
Created February 18, 2023 12:52
Show Gist options
  • Save Romern/f97f8d6918c9709c62444f8325e31e46 to your computer and use it in GitHub Desktop.
Save Romern/f97f8d6918c9709c62444f8325e31e46 to your computer and use it in GitHub Desktop.
wp6003 BLE to MQTT and integration for home assistant

ble2mqtt was very unstable for me and I only use the wp6003 sensor anyway, so I wrote this small python script. based on https://github.com/zu2/wp6003 add configuration.yaml to home assistants configuration.yaml.

Requirements: bleak paho click

mqtt:
sensor:
- name: "Temperature"
state_topic: "bedroom/wp6003"
unit_of_measurement: "°C"
value_template: "{{ value_json.temperature }}"
- name: "TVOC"
state_topic: "bedroom/wp6003"
unit_of_measurement: "mg/m³"
value_template: "{{ value_json.tvoc }}"
- name: "HCHO"
state_topic: "bedroom/wp6003"
unit_of_measurement: "mg/m³"
value_template: "{{ value_json.hcho }}"
- name: "CO2"
state_topic: "bedroom/wp6003"
unit_of_measurement: "ppm"
value_template: "{{ value_json.co2 }}"
import asyncio
import click
import datetime
import json
import paho.mqtt.client as mqtt
from bleak import BleakClient, BleakGATTCharacteristic
SERVICE = '0000FFF0-0000-1000-8000-00805F9B34FB'
COMMAND = '0000FFF1-0000-1000-8000-00805F9B34FB'
SENSOR = '0000FFF4-0000-1000-8000-00805F9B34FB'
mqtt_client = mqtt.Client()
mqtt_topic = None
def callback(sender: BleakGATTCharacteristic, data: bytearray):
global mqtt_topic
global mqtt_client
if len(data) < 16:
print("Hmm, little data here:", data)
return
print('Notify:',datetime.datetime.now(),' ',data.hex())
print("Time: 20",data[1],"/",data[2],"/",data[3]," ",data[4],":",data[5],sep='')
temperature = (data[6]*256+data[7])/10
print('Temp:',temperature,'℃')
tvoc = (data[10]*256+data[11])/1000
print('TVOC:',tvoc,'mg/㎥')
hcho = (data[12]*256+data[13])/1000
print('HCHO:',hcho,'mg/㎥')
co2 = (data[16]*256+data[17])
print('CO2 :',co2,'ppm')
mqtt_client.connect("127.0.0.1") # maybe needed maybe not, dunno
mqtt_client.publish(mqtt_topic, payload=json.dumps({"temperature": temperature, "tvoc": tvoc, "hcho": hcho, "co2": co2}))
async def main_async(address, interval):
async with BleakClient(address) as client:
print("Getting service...")
service = client.services.get_service(SERVICE)
print("Getting command...")
command = service.get_characteristic(COMMAND)
print("Getting sensor...")
sensor = service.get_characteristic(SENSOR)
print("Sending init command...")
await client.write_gatt_char(command, bytes.fromhex("ee"))
print("Sending datetime...")
t = datetime.datetime.now()
await client.write_gatt_char(command, bytes([0xaa,t.year%100,t.month,t.day,t.hour,t.minute,t.second]))
print("Sending 2nd init command...")
await client.write_gatt_char(command, bytes.fromhex("ae"))
print("Sending 3rd init command...")
await client.write_gatt_char(command, bytes.fromhex("ab"))
print("Read Sensor data:")
await client.start_notify(sensor, callback)
while True:
await asyncio.sleep(interval)
await client.write_gatt_char(command, bytes.fromhex("ab"))
@click.command()
@click.option('--mac', required=True, help='Mac adress of WP6003 sensor.')
@click.option('--interval', default=10, help='Interval in seconds when to retrieve data.')
@click.option('--topic', default="bedroom/wp6003", help='topic on which the data is published on the local mqtt server')
def main(mac, interval, topic):
global mqtt_topic
mqtt_topic = topic
asyncio.run(main_async(mac, interval))
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment