import gc | |
import ujson | |
# from libs import ssd1306 | |
from machine import Pin, I2C, reset | |
from dht import DHT11 | |
from time import sleep_ms | |
from ujson import dump, load | |
from umqtt.simple import MQTTClient | |
id = '***********' | |
server = '***********' | |
port = 1111 | |
username = '***********' | |
password = '***********' | |
led17 = Pin(17, Pin.OUT) | |
led18 = Pin(18, Pin.OUT) | |
# d = DHT11(Pin(4)) | |
# i2c = I2C(-1, Pin(22), Pin(21)) # slc, sda | |
# display = ssd1306.SSD1306_I2C(128, 64, i2c) | |
# display.fill(0) | |
def writelastcfg(pin0=17, value0=0, pin1=18, value1=0): | |
lastcfg = {pin0: value0, pin1: value1} | |
with open('db.json', 'w') as f: | |
dump(lastcfg, f) | |
print('State writed successfull.') | |
def writelastcfg(pin0=17, value0=0, pin1=18, value1=0): | |
lastcfg = {pin0: value0, pin1: value1} | |
with open('db.json', 'w') as f: | |
dump(lastcfg, f) | |
print('State writed successfull.') | |
def getlastcfg(): | |
try: | |
with open('db.json', 'r') as f: | |
lastcfg = load(f) | |
led17.value(lastcfg[17]) | |
led18.value(lastcfg[18]) | |
print('Last config applied successfull.') | |
except: | |
led17.value(0) | |
led18.value(0) | |
print('Some error to get last states, state 0 applied.') | |
def convert_str_dict(str): | |
try: | |
a = str.replace("'", "\"") | |
return ujson.loads(a) | |
except: | |
print('Fail to convert string {} to dict.'.format(str)) | |
def flash_led_2(): | |
led = Pin(2, Pin.OUT, value=1) | |
led.value(0) | |
sleep_ms(50) | |
led.value(1) | |
sleep_ms(50) | |
# def send_dht(c): | |
# try: | |
# d.measure() | |
# data = {'temperature': d.temperature(), 'humidity': d.humidity()} | |
# sleep_ms(1000) | |
# display.fill(0) | |
# display.text('T: {}, H: {}'.format(d.temperature(), d.humidity()), 0, 10) | |
# c.publish('esp32/dht11', '{}'.format(data)) | |
# except OSError as e: | |
# print('Some error to read DHT11 data: {}'.format(e)) | |
# receive all notifications from topic | |
def sub_cb(topic, msg): | |
gc.collect() | |
getlastcfg() | |
# display.fill(0) | |
msg = msg.decode('utf-8') | |
topic = topic.decode('utf-8') | |
msg = convert_str_dict(msg) | |
led = Pin(msg['pin'], Pin.OUT) | |
print('Received from topic: {}, with message: {}!'.format(topic, msg)) | |
# display.text('T: {}!'.format(topic), 0, 20) | |
flash_led_2() | |
if msg['state'] == 'toggle': | |
state = 1-led.value() | |
led.value(state) | |
if msg['pin'] == 17: | |
writelastcfg(pin0=17, value0=state, pin1=18, value1=led18.value()) | |
elif msg['pin'] == 18: | |
writelastcfg(pin0=17, value0=led17.value(), pin1=18, value1=state) | |
print('Changing state of pin {} to {}'.format(msg['pin'], state)) | |
# display.text('Pin {} to {}'.format(msg['pin'], value), 0, 30) | |
elif msg['state'] is 0 or msg['state'] is 1: | |
value = int(msg['state']) | |
led.value(value) | |
if msg['pin'] == 17: | |
writelastcfg(pin0=17, value0=value, pin1=18, value1=led18.value()) | |
elif msg['pin'] == 18: | |
writelastcfg(pin0=17, value0=led17.value(), pin1=18, value1=value) | |
print('Changing state of pin {} to {}'.format(msg['pin'], value)) | |
# display.text('Pin {} to {}'.format(msg['pin'], value), 0, 30) | |
# connect with mqtt client | |
def main(id, server, port, username, password): | |
c = MQTTClient(id, server, port, username, password) | |
c.set_callback(sub_cb) | |
c.connect() | |
c.subscribe('esp32/leds') | |
print('Connected!') | |
# display.text('Connected!', 0, 10) | |
sleep_ms(1000) | |
# display.fill(0) | |
# display.show() | |
try: | |
while True: | |
# c.wait_msg() | |
c.check_msg() | |
gc.collect() | |
# send_dht(c) | |
# display.show() | |
except OSError as e: | |
print('Error: {}'.format(e)) | |
print('Failed to connect to MQTT broker. Reconnecting...') | |
sleep_ms(5000) | |
reset() | |
finally: | |
print('Disconnecting...') | |
c.disconnect() | |
# function called in main.py | |
def connect(): | |
getlastcfg() | |
main(id, server, port, username, password) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment