Created
July 16, 2024 18:43
-
-
Save Jan200101/ba0545e5670084547a8b9d6a077f32df to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from micropython import const | |
from machine import Pin | |
from rp2 import bootsel_button | |
import network | |
import aiohttp | |
import asyncio | |
import logging | |
import sys | |
import ntptime | |
_level = logging.DEBUG | |
_formatter = logging.Formatter("[%(asctime)s][%(name)s][%(levelname)s] %(message)s", "%Y-%m-%d %H:%M:%S") | |
class LogHandler(logging.Handler): | |
def __init__(self, stream, filepath, level): | |
super().__init__(level) | |
self.stream = stream | |
self.file = open(filepath, mode="a") | |
print(self.file) | |
self.terminator = "\n" | |
self.formatter = _formatter | |
def close(self): | |
if hasattr(self.stream, "flush"): | |
self.stream.flush() | |
if hasattr(self.file, "flush"): | |
self.file.flush() | |
self.file.close() | |
def emit(self, record): | |
if record.levelno >= self.level: | |
m = self.format(record) + self.terminator | |
self.stream.write(m) | |
#self.file.write(m) | |
#self.file.flush() | |
root_log = logging.getLogger() | |
root_log.handlers.clear() | |
root_log.addHandler(LogHandler(sys.stderr, "/log", logging.DEBUG)) | |
root_log.setLevel(logging.DEBUG) | |
log = logging.getLogger("TempSens") | |
log.info("Program start") | |
led = Pin("LED", Pin.OUT) | |
led.off() | |
switch_output = Pin(16, Pin.OUT, value=0) | |
# STAT_NO_IP isn't exposed via MicroPython | |
STATUS = {getattr(network, x):x for x in dir(network) if x.startswith("STAT")} | |
STAT_NO_IP = 2 | |
STATUS.update({ | |
STAT_NO_IP: "STAT_NO_IP" | |
}) | |
async def connect_network(sta_if, ssid, psk): | |
if not sta_if.active(): | |
# only enable if its not active | |
log.info("Enabling sta") | |
sta_if.active(True) | |
if not sta_if.isconnected() or sta_if.status() != network.STAT_GOT_IP: | |
log.info(f"Connecting to {ssid}") | |
sta_if.connect(ssid, psk) | |
status = sta_if.status() | |
while status == network.STAT_CONNECTING or status == network.STAT_IDLE: | |
# We are still connecting, sleep a bit | |
led.toggle() | |
await asyncio.sleep(0.5) | |
status = sta_if.status() | |
c = 0 | |
while status == STAT_NO_IP: | |
# Okay, maybe the DHCP server is taking a bit longer... | |
if c > 60: | |
# Yeah no, its been over a minute | |
# We aren't getting an IP | |
raise Exception("Unable to connect to Network:", "Failed to get IP") | |
elif c == 0: | |
log.info("Waiting for IP...") | |
led.toggle() | |
await asyncio.sleep(0.5) | |
c += 1 | |
status = sta_if.status() | |
if sta_if.isconnected(): | |
log.info(f"Connected ({sta_if.ifconfig()[0]})") | |
else: | |
raise Exception("Unable to connect to Network:", STATUS[sta_if.status()]) | |
else: | |
log.info(f"Already connected {sta_if.ifconfig()[0]}") | |
log.info("Setting time") | |
ntptime.settime() | |
def switch_on(): | |
led.on() | |
switch_output.on() | |
def switch_off(): | |
led.off() | |
switch_output.off() | |
def switch_toggle(): | |
led.toggle() | |
switch_output.toggle() | |
async def parse_data(result_data): | |
LEVEL_START = 98 | |
LEVEL_END = 88 | |
battery_level = -1 | |
for l in result_data["list"]: | |
if l["data_name"] == "I18N_COMMON_BATTERY_SOC": | |
battery_level = float(l["data_value"]) | |
break | |
if battery_level >= LEVEL_START: | |
switch_on() | |
elif battery_level <= LEVEL_END: | |
switch_off() | |
log.info(f"Battery level: {battery_level}",) | |
async def button_handler(): | |
last_state = 0 | |
while True: | |
state = bootsel_button() | |
if state != last_state: | |
if state: | |
switch_toggle() | |
log.info("Button Press: " + str(switch_output.value())) | |
last_state = state | |
await asyncio.sleep(0) | |
async def temp_handler(): | |
ws_url = const("ws://192.168.178.65:8082/ws/home/overview") | |
ws_request = {"lang":"en_us","dev_id":"1","service":"real_battery"} | |
sta_if = network.WLAN(network.STA_IF) | |
while True: | |
try: | |
led.on() | |
if not sta_if.isconnected() or sta_if.status() != network.STAT_GOT_IP: | |
await connect_network(sta_if, "easybox1", "Zwiebelkuchen2001") | |
except Exception as e: | |
log.exception(e) | |
await asyncio.sleep(1) | |
continue | |
led.value(switch_output.value()) | |
try: | |
async with aiohttp.ClientSession() as session: | |
log.info("Client Session started") | |
async with session.ws_connect(ws_url) as ws: | |
log.info("Connection to socket established") | |
while True: | |
await ws.send_json(ws_request) | |
await asyncio.sleep(1) # sleep to ensure the data will arrive | |
# Toggle led twice to signal we got something | |
led.toggle() | |
data = await ws.receive_json() | |
led.toggle() | |
await parse_data(data["result_data"]) | |
await asyncio.sleep(10) | |
log.error("Connection Interrupted, restarting...") | |
continue | |
log.error("Session Interrupted, restarting...") | |
continue | |
except Exception as e: | |
log.exception(e) | |
await asyncio.sleep(0) | |
continue | |
async def main(): | |
asyncio.create_task(button_handler()) | |
asyncio.create_task(temp_handler()) | |
while True: | |
await asyncio.sleep(3600) | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment