Skip to content

Instantly share code, notes, and snippets.

@karakum
Created April 29, 2020 20:14
Show Gist options
  • Save karakum/e5b18e625dd01dcff8bc63beffee6d16 to your computer and use it in GitHub Desktop.
Save karakum/e5b18e625dd01dcff8bc63beffee6d16 to your computer and use it in GitHub Desktop.
Cleargrass Air Detector to mqtt
#!/usr/bin/python3
from time import sleep, time
import miio
import argparse
import time
import paho.mqtt.client as mqtt
import io
import sys
import signal
verbosity=False
version="0.1"
parser = argparse.ArgumentParser(description='ClearGrass Air Monitor MQTT poller')
parser.add_argument('--mqtt-host', default='localhost', help='MQTT server address. Defaults to "localhost"')
parser.add_argument('--mqtt-port', default='1883', type=int, help='MQTT server port. Defaults to 1883')
parser.add_argument('--mqtt-topic', default='air-monitor/', help='Topic prefix to be used for subscribing/publishing. Defaults to "air-monitor/"')
parser.add_argument('--verbose',action='store_true' ,help='blah')
args=parser.parse_args()
verbosity=args.verbose
class Control:
def __init__(self):
self.runLoop = True
def stopLoop(self):
self.runLoop = False
control = Control()
globaltopic=args.mqtt_topic
if not globaltopic.endswith("/"):
globaltopic+="/"
if verbosity:
print('Starting ClearGrass Air Monitor MQTT poller V%s with topic prefix \"%s\"' %(version, globaltopic))
def signal_handler(signal, frame):
print('Exiting ' + sys.argv[0])
control.stopLoop()
signal.signal(signal.SIGINT, signal_handler)
def connecthandler(mqc,userdata,flags,rc):
if verbosity:
print("Connected to MQTT broker with rc=%d" %(rc))
mqc.publish(globaltopic+"connected",1,qos=0,retain=True)
def disconnecthandler(mqc,userdata,rc):
if verbosity:
print("Disconnected from MQTT broker with rc=%d" %(rc))
if True:
clientid=globaltopic + "-" + str(time.time())
mqc=mqtt.Client(client_id=clientid)
mqc.on_connect=connecthandler
mqc.on_disconnect=disconnecthandler
mqc.will_set(globaltopic+"connected",0,qos=2,retain=True)
mqc.disconnected = True
mqc.connect(args.mqtt_host,args.mqtt_port,60)
mqc.loop_start()
MONITOR_IP = '192.168.1.138'
MONITOR_TOKEN = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
monitor_conn = None
tvocKoef = 0.0045714285714286
while control.runLoop:
sleep_time = 5
try:
if monitor_conn is None:
if verbosity:
print("Connecting to the monitor...")
monitor_conn = miio.airqualitymonitor.AirQualityMonitor(ip=MONITOR_IP, token=MONITOR_TOKEN, model='cgllc.airmonitor.s1')
if verbosity:
print("Connected!")
st = monitor_conn.status()
tvoc = float("{:.3f}".format(st.tvoc*tvocKoef))
# if verbosity:
# print(st,tvoc)
mqc.publish(globaltopic+"state/battery",st.battery,qos=0,retain=True)
mqc.publish(globaltopic+"state/temperature",st.temperature,qos=0,retain=True)
mqc.publish(globaltopic+"state/humidity",st.humidity,qos=0,retain=True)
mqc.publish(globaltopic+"state/co2",st.co2,qos=0,retain=True)
mqc.publish(globaltopic+"state/pm25",st.pm25,qos=0,retain=True)
mqc.publish(globaltopic+"state/tvoc",tvoc,qos=0,retain=True)
except Exception as ex:
print(ex)
sleep_time = 1
sleep(sleep_time)
mqc.loop_stop()
sys.exit(1)
@GreyEarl
Copy link

Great script! I stumbled upon this when I was searching for MQTT possibilities with this device. After I contacted Pingqing about their API, they sent me a (Chinese) document. There are references to MQTT, but I don't know if the device is capable to send out MQTT messages locally. If you are interested in the document, I can send it over.

@karakum
Copy link
Author

karakum commented May 12, 2020

Hi! Here author successfuly "redirect" device traffic to local mqtt broker. I know device send it to chinese cloud.

@GreyEarl
Copy link

Hi! Here author successfuly "redirect" device traffic to local mqtt broker. I know device send it to chinese cloud.

Ah, that's great! Thanks for the link! Great article! This never appeared in my search results :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment