Skip to content

Instantly share code, notes, and snippets.

@Jan200101
Created July 16, 2024 18:43
Show Gist options
  • Save Jan200101/ba0545e5670084547a8b9d6a077f32df to your computer and use it in GitHub Desktop.
Save Jan200101/ba0545e5670084547a8b9d6a077f32df to your computer and use it in GitHub Desktop.
from micropython import const
from machine import Pin
from rp2 import bootsel_button
import network
import aiohttp
import asyncio
import logging
import sys
import ntptime
_level = logging.DEBUG
_formatter = logging.Formatter("[%(asctime)s][%(name)s][%(levelname)s] %(message)s", "%Y-%m-%d %H:%M:%S")
class LogHandler(logging.Handler):
def __init__(self, stream, filepath, level):
super().__init__(level)
self.stream = stream
self.file = open(filepath, mode="a")
print(self.file)
self.terminator = "\n"
self.formatter = _formatter
def close(self):
if hasattr(self.stream, "flush"):
self.stream.flush()
if hasattr(self.file, "flush"):
self.file.flush()
self.file.close()
def emit(self, record):
if record.levelno >= self.level:
m = self.format(record) + self.terminator
self.stream.write(m)
#self.file.write(m)
#self.file.flush()
root_log = logging.getLogger()
root_log.handlers.clear()
root_log.addHandler(LogHandler(sys.stderr, "/log", logging.DEBUG))
root_log.setLevel(logging.DEBUG)
log = logging.getLogger("TempSens")
log.info("Program start")
led = Pin("LED", Pin.OUT)
led.off()
switch_output = Pin(16, Pin.OUT, value=0)
# STAT_NO_IP isn't exposed via MicroPython
STATUS = {getattr(network, x):x for x in dir(network) if x.startswith("STAT")}
STAT_NO_IP = 2
STATUS.update({
STAT_NO_IP: "STAT_NO_IP"
})
async def connect_network(sta_if, ssid, psk):
if not sta_if.active():
# only enable if its not active
log.info("Enabling sta")
sta_if.active(True)
if not sta_if.isconnected() or sta_if.status() != network.STAT_GOT_IP:
log.info(f"Connecting to {ssid}")
sta_if.connect(ssid, psk)
status = sta_if.status()
while status == network.STAT_CONNECTING or status == network.STAT_IDLE:
# We are still connecting, sleep a bit
led.toggle()
await asyncio.sleep(0.5)
status = sta_if.status()
c = 0
while status == STAT_NO_IP:
# Okay, maybe the DHCP server is taking a bit longer...
if c > 60:
# Yeah no, its been over a minute
# We aren't getting an IP
raise Exception("Unable to connect to Network:", "Failed to get IP")
elif c == 0:
log.info("Waiting for IP...")
led.toggle()
await asyncio.sleep(0.5)
c += 1
status = sta_if.status()
if sta_if.isconnected():
log.info(f"Connected ({sta_if.ifconfig()[0]})")
else:
raise Exception("Unable to connect to Network:", STATUS[sta_if.status()])
else:
log.info(f"Already connected {sta_if.ifconfig()[0]}")
log.info("Setting time")
ntptime.settime()
def switch_on():
led.on()
switch_output.on()
def switch_off():
led.off()
switch_output.off()
def switch_toggle():
led.toggle()
switch_output.toggle()
async def parse_data(result_data):
LEVEL_START = 98
LEVEL_END = 88
battery_level = -1
for l in result_data["list"]:
if l["data_name"] == "I18N_COMMON_BATTERY_SOC":
battery_level = float(l["data_value"])
break
if battery_level >= LEVEL_START:
switch_on()
elif battery_level <= LEVEL_END:
switch_off()
log.info(f"Battery level: {battery_level}",)
async def button_handler():
last_state = 0
while True:
state = bootsel_button()
if state != last_state:
if state:
switch_toggle()
log.info("Button Press: " + str(switch_output.value()))
last_state = state
await asyncio.sleep(0)
async def temp_handler():
ws_url = const("ws://192.168.178.65:8082/ws/home/overview")
ws_request = {"lang":"en_us","dev_id":"1","service":"real_battery"}
sta_if = network.WLAN(network.STA_IF)
while True:
try:
led.on()
if not sta_if.isconnected() or sta_if.status() != network.STAT_GOT_IP:
await connect_network(sta_if, "easybox1", "Zwiebelkuchen2001")
except Exception as e:
log.exception(e)
await asyncio.sleep(1)
continue
led.value(switch_output.value())
try:
async with aiohttp.ClientSession() as session:
log.info("Client Session started")
async with session.ws_connect(ws_url) as ws:
log.info("Connection to socket established")
while True:
await ws.send_json(ws_request)
await asyncio.sleep(1) # sleep to ensure the data will arrive
# Toggle led twice to signal we got something
led.toggle()
data = await ws.receive_json()
led.toggle()
await parse_data(data["result_data"])
await asyncio.sleep(10)
log.error("Connection Interrupted, restarting...")
continue
log.error("Session Interrupted, restarting...")
continue
except Exception as e:
log.exception(e)
await asyncio.sleep(0)
continue
async def main():
asyncio.create_task(button_handler())
asyncio.create_task(temp_handler())
while True:
await asyncio.sleep(3600)
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment