Skip to content

Instantly share code, notes, and snippets.

@bergpb bergpb/mqtt.py
Created Jul 4, 2019

Embed
What would you like to do?
import gc
import ujson
# from libs import ssd1306
from machine import Pin, I2C, reset
from dht import DHT11
from time import sleep_ms
from ujson import dump, load
from umqtt.simple import MQTTClient
id = '***********'
server = '***********'
port = 1111
username = '***********'
password = '***********'
led17 = Pin(17, Pin.OUT)
led18 = Pin(18, Pin.OUT)
# d = DHT11(Pin(4))
# i2c = I2C(-1, Pin(22), Pin(21)) # slc, sda
# display = ssd1306.SSD1306_I2C(128, 64, i2c)
# display.fill(0)
def writelastcfg(pin0=17, value0=0, pin1=18, value1=0):
lastcfg = {pin0: value0, pin1: value1}
with open('db.json', 'w') as f:
dump(lastcfg, f)
print('State writed successfull.')
def writelastcfg(pin0=17, value0=0, pin1=18, value1=0):
lastcfg = {pin0: value0, pin1: value1}
with open('db.json', 'w') as f:
dump(lastcfg, f)
print('State writed successfull.')
def getlastcfg():
try:
with open('db.json', 'r') as f:
lastcfg = load(f)
led17.value(lastcfg[17])
led18.value(lastcfg[18])
print('Last config applied successfull.')
except:
led17.value(0)
led18.value(0)
print('Some error to get last states, state 0 applied.')
def convert_str_dict(str):
try:
a = str.replace("'", "\"")
return ujson.loads(a)
except:
print('Fail to convert string {} to dict.'.format(str))
def flash_led_2():
led = Pin(2, Pin.OUT, value=1)
led.value(0)
sleep_ms(50)
led.value(1)
sleep_ms(50)
# def send_dht(c):
# try:
# d.measure()
# data = {'temperature': d.temperature(), 'humidity': d.humidity()}
# sleep_ms(1000)
# display.fill(0)
# display.text('T: {}, H: {}'.format(d.temperature(), d.humidity()), 0, 10)
# c.publish('esp32/dht11', '{}'.format(data))
# except OSError as e:
# print('Some error to read DHT11 data: {}'.format(e))
# receive all notifications from topic
def sub_cb(topic, msg):
gc.collect()
getlastcfg()
# display.fill(0)
msg = msg.decode('utf-8')
topic = topic.decode('utf-8')
msg = convert_str_dict(msg)
led = Pin(msg['pin'], Pin.OUT)
print('Received from topic: {}, with message: {}!'.format(topic, msg))
# display.text('T: {}!'.format(topic), 0, 20)
flash_led_2()
if msg['state'] == 'toggle':
state = 1-led.value()
led.value(state)
if msg['pin'] == 17:
writelastcfg(pin0=17, value0=state, pin1=18, value1=led18.value())
elif msg['pin'] == 18:
writelastcfg(pin0=17, value0=led17.value(), pin1=18, value1=state)
print('Changing state of pin {} to {}'.format(msg['pin'], state))
# display.text('Pin {} to {}'.format(msg['pin'], value), 0, 30)
elif msg['state'] is 0 or msg['state'] is 1:
value = int(msg['state'])
led.value(value)
if msg['pin'] == 17:
writelastcfg(pin0=17, value0=value, pin1=18, value1=led18.value())
elif msg['pin'] == 18:
writelastcfg(pin0=17, value0=led17.value(), pin1=18, value1=value)
print('Changing state of pin {} to {}'.format(msg['pin'], value))
# display.text('Pin {} to {}'.format(msg['pin'], value), 0, 30)
# connect with mqtt client
def main(id, server, port, username, password):
c = MQTTClient(id, server, port, username, password)
c.set_callback(sub_cb)
c.connect()
c.subscribe('esp32/leds')
print('Connected!')
# display.text('Connected!', 0, 10)
sleep_ms(1000)
# display.fill(0)
# display.show()
try:
while True:
# c.wait_msg()
c.check_msg()
gc.collect()
# send_dht(c)
# display.show()
except OSError as e:
print('Error: {}'.format(e))
print('Failed to connect to MQTT broker. Reconnecting...')
sleep_ms(5000)
reset()
finally:
print('Disconnecting...')
c.disconnect()
# function called in main.py
def connect():
getlastcfg()
main(id, server, port, username, password)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.