Last active
August 5, 2024 12:17
-
-
Save toxicantidote/0d4f10288d91943c4ac1196b3c2ad667 to your computer and use it in GitHub Desktop.
Fridge GUI/control
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import board | |
from micropython import const as S | |
## Screen parameters | |
LCD_WIDTH = S(130) | |
LCD_HEIGHT = S(64) | |
LCD_ROTATION = S(0) | |
## Button thresholds | |
THRESHOLD_BLUE = S(8000) | |
THRESHOLD_RED = S(50000) | |
## Button repeat rate (sec) | |
REPEAT_BUTTON = S(0.25) | |
## Minimum/maximum allowed settable temperature | |
TEMPERATURE_MIN = S(-20) | |
TEMPERATURE_MAX = S(10) | |
## Minimum and maximum tempertuares considered 'sane' values. Values outside of | |
## this range will be rejected as invalid | |
TEMPERATURE_SANE_MIN = S(-30) | |
TEMPERATURE_SANE_MAX = S(60) | |
## If the temperature differs from the 5min average by more than this, | |
## consider an NTC fault | |
TEMPERATURE_RATE = S(10) | |
## SPI pins | |
PIN_SPI_MOSI = S(board.GP19) | |
PIN_SPI_CLK = S(board.GP18) | |
## Screen pins | |
PIN_LCD_CS = S(board.GP22) | |
PIN_LCD_DC = S(board.GP21) | |
PIN_LCD_RESET = S(board.GP20) | |
## NTC pin | |
PIN_NTC = S(board.GP27) | |
## DS18x20 pin | |
PIN_TEMP_PROBE = S(board.GP1) | |
## Buzzer pin | |
PIN_BUZZ = S(board.GP6) | |
## Buttons input | |
PIN_BUTTON = S(board.GP26) | |
## UART to fridge fault/compressor board (Arduino) | |
PIN_UART_TX = S(board.GP12) | |
PIN_UART_RX = S(board.GP13) | |
UART_BAUD = S(9600) | |
## NTC thermistor resistance and temperatuire pairs (ohms/celsius) | |
CALIBRATION_NTC_A_RESISTANCE = S(24700) | |
CALIBRATION_NTC_A_CELSIUS = S(8) | |
CALIBRATION_NTC_B_RESISTANCE = S(10000) | |
CALIBRATION_NTC_B_CELSIUS = S(25) | |
CALIBRATION_NTC_C_RESISTANCE = S(1003) | |
CALIBRATION_NTC_C_CELSIUS = S(91) | |
## value of the other resistor in the voltage divider with the NTC | |
CALIBRATION_NTC_DIVIDER = S(67200) | |
## offset in degrees for the NTC value | |
CALIBRATION_NTC_OFFSET = S(0) | |
### | |
from adafruit_displayio_sh1106 import SH1106 as SPI_SCREEN | |
from adafruit_display_text import bitmap_label as label | |
from adafruit_display_shapes import rect | |
import displayio | |
import terminalio | |
import wifi | |
import math | |
import analogio | |
import time | |
import gc | |
import random | |
import digitalio | |
import socketpool | |
import supervisor | |
import busio | |
import re | |
import traceback | |
try: | |
#try: | |
# with open('temperature', 'r') as fp: | |
# set_temperature = int(fp.read()) | |
# print('Using saved temperature ' + str(set_temperature) + 'c') | |
#except: | |
# set_temperature = 3 | |
# print('Using default temperature ' + str(set_temperature) + 'c') | |
set_temperature = 3 | |
secret_source = None | |
compressor_speed = 0 | |
compressor_speed_last_set = 0 | |
wifi_connected = False | |
splash = None | |
p_ntc = None | |
ssid = password = '' | |
sock = udp_host = udp_port = None | |
last_compressor_comms = time.time() | |
temp_avg_last = 0 | |
temp_avg_list = dict() | |
error_code = 0 | |
error_time = 0 | |
p_button = analogio.AnalogIn(PIN_BUTTON) | |
p_buzz = digitalio.DigitalInOut(PIN_BUZZ) | |
p_buzz.direction = digitalio.Direction.OUTPUT | |
p_buzz.value = False | |
uart = None | |
## we give NTC's their own class so we can set properties on them. this is | |
## mostly just a wrapper around an analogio.AnalogIn object | |
class NTC(): | |
def __init__(self, port, a_r, a_c, b_r, b_c, c_r, c_c, divider, offset): | |
self.ain = analogio.AnalogIn(port) | |
self.divider_resistor = divider | |
self.offset = offset | |
self.coefficient_a, self.coefficient_b, self.coefficient_c = self.calculate_coefficient(a_r, a_c, b_r, b_c, c_r, c_c) | |
@property | |
def value(self): | |
return self.ain.value | |
@property | |
def reference_voltage(self): | |
return self.ain.reference_voltage | |
def temperature(self): | |
samples = [] | |
for i in range(5): | |
samples.append(self._temperature_single()) | |
#time.sleep(0.1) | |
return (sum(samples)/len(samples)) | |
def _temperature_single(self): | |
## work out the resistance of the thermistor | |
volt = self.reference_voltage - (self.reference_voltage * (self.ain.value / 65535)) | |
current = (self.reference_voltage - volt) / self.divider_resistor | |
r_ntc = volt / current | |
## work out the temperatures of the thermistor in degrees | |
temp = (1/(self.coefficient_a+self.coefficient_b*math.log(r_ntc)+self.coefficient_c*math.pow(math.log(r_ntc),3))-273.15) + self.offset | |
return temp | |
def calculate_coefficient(self, a_r, a_c, b_r, b_c, c_r, c_c): | |
## convert to kelvin | |
a_k = a_c + 273.15 | |
b_k = b_c + 273.15 | |
c_k = c_c + 273.15 | |
## calculate coefficients | |
c = ((1/a_k-1/b_k)-(math.log(a_r)- math.log(b_r))*(1/a_k-1/c_k)/(math.log(a_r)-math.log(c_r)))/((math.pow(math.log(a_r),3)-math.pow(math.log(b_r),3)) - (math.log(a_r)-math.log(b_r))*(math.pow(math.log(a_r),3)-math.pow(math.log(c_r),3))/(math.log(a_r)-math.log(c_r))) | |
b = ((1/a_k-1/b_k)-c*(math.pow(math.log(a_r),3)-math.pow(math.log(b_r),3)))/(math.log(a_r)-math.log(b_r)) | |
a = 1/a_k-c*(math.log(a_r))*(math.log(a_r))*(math.log(a_r))-b*math.log(a_r) | |
return a, b, c | |
def screen_current_temp(current, target, compressor_is_holding): | |
global compressor_speed | |
screen_ct = displayio.Group() | |
screen_ct.append(label.Label(terminalio.FONT, text = str(current) + 'c', scale = 4, x = 20, y = 20)) | |
screen_ct.append(label.Label(terminalio.FONT, text = 'Set ' + str(target) + 'c', scale = 2, x = 5, y = 45)) | |
if compressor_is_holding == True: hold_text = ' (H)' | |
else: hold_text = '' | |
if compressor_speed == 0: screen_ct.append(label.Label(terminalio.FONT, text = 'Compressor: Off' + hold_text, x = 5, y = 60)) | |
elif compressor_speed == 1: screen_ct.append(label.Label(terminalio.FONT, text = 'Compressor: Slow' + hold_text, x = 5, y = 60)) | |
elif compressor_speed == 2: screen_ct.append(label.Label(terminalio.FONT, text = 'Compressor: Fast' + hold_text, x = 5, y = 60)) | |
return screen_ct | |
def set_compressor_speed(speed = 0): | |
#global p_relay_c1, p_relay_c2 | |
global uart | |
global compressor_speed, compressor_speed_last_set | |
## Speeds: | |
## 0 = off | |
## 1 = low speed (2000rpm) | |
## 2 = high speed (3500rpm) | |
if speed < 0 or speed > 2: | |
print('Invalid compressor speed') | |
return False | |
if speed != compressor_speed: | |
if (time.time() - compressor_speed_last_set) < 60: | |
print('Ignored compressor speed set. Too soon!') | |
return True | |
else: | |
compressor_speed_last_set = time.time() | |
compressor_speed = speed | |
if speed == 0: | |
#print('Compressor off') | |
uart.write(bytes("0", 'utf-8')) | |
elif speed == 1: | |
#print('Compressor low speed') | |
uart.write(bytes("1", 'utf-8')) | |
elif speed == 2: | |
#print('Compressor high speed') | |
uart.write(bytes("2", 'utf-8')) | |
return False | |
def check_error(): | |
global uart, last_compressor_comms, error_code, error_time | |
try: | |
uart.write(bytes('S', 'utf-8')) | |
rx_data = re.sub('\n', '', uart.read(64).decode()) | |
except: | |
print('UART comms exception') | |
return 12 | |
if rx_data != '': last_compressor_comms = time.time() | |
re_error = re.search(r'STATUS (ERROR|OKAY)\s?(\d)?', rx_data) | |
if re_error: | |
error_status = re_error.group(1) | |
if error_status == 'OKAY': error_code = 0 | |
else: error_code = re_error.group(2) | |
error_time = time.time() | |
return error_code | |
else: | |
## keep errors displayed for 2mins since last reported by compressor control | |
if (time.time() - error_time) < 120: return error_code | |
else: return 0 | |
def show_error(flashes): | |
if flashes == 0: return | |
error_widgets = displayio.Group() | |
error_widgets.append(rect.Rect(2, 1, LCD_WIDTH-2, 35, fill = 0x000000, outline = 0xFFFFFF)) | |
## compressor fault codes | |
if flashes == 1: | |
error_widgets.append(label.Label(terminalio.FONT, text = 'FAULT DETECTED!\nBattery low', x = 5, y = 10, color = 0xFFFFFF)) | |
elif flashes == 2: | |
error_widgets.append(label.Label(terminalio.FONT, text = 'FAULT DETECTED!\nFan over-current', x = 5, y = 10, color = 0xFFFFFF)) | |
elif flashes == 3: | |
error_widgets.append(label.Label(terminalio.FONT, text = 'FAULT DETECTED!\nRefrigerant block', x = 5, y = 10, color = 0xFFFFFF)) | |
elif flashes == 4: | |
error_widgets.append(label.Label(terminalio.FONT, text = 'FAULT DETECTED!\nCompressor overload', x = 5, y = 10, color = 0xFFFFFF)) | |
elif flashes == 5: | |
error_widgets.append(label.Label(terminalio.FONT, text = 'FAULT DETECTED!\nCompressor too hot', x = 5, y = 10, color = 0xFFFFFF)) | |
## our fault codes | |
elif flashes == 10: ## temperature changed too fast | |
error_widgets.append(label.Label(terminalio.FONT, text = 'Rapid temperature\nchange, check probe.', x = 5, y = 10, color = 0xFFFFFF)) | |
elif flashes == 11: ## temperature outside sane values | |
error_widgets.append(label.Label(terminalio.FONT, text = 'Impossible temperature\n check probe.', x = 5, y = 10, color = 0xFFFFFF)) | |
elif flashes == 12: ## compressor controller (arduino) unresponsive | |
error_widgets.append(label.Label(terminalio.FONT, text = 'FAULT DETECTED!\nController offline', x = 5, y = 10, color = 0xFFFFFF)) | |
## anything else | |
else: | |
error_widgets.append(label.Label(terminalio.FONT, text = 'FAULT DETECTED!\nFault code ' + str(flashes), x = 5, y = 10, color = 0xFFFFFF)) | |
return error_widgets | |
## scans wireless networks | |
def scan_wifi(splash, x_status, y_offset): | |
print('Wi-Fi scan: Scanning') | |
y_offset_original = y_offset | |
splash.append(label.Label(terminalio.FONT, text = 'Wi-Fi', color = 0xFFFFFF, x = 5, y = y_offset)) | |
label_scanning = label.Label(terminalio.FONT, text = '[SCAN]', color = 0xFFFFFF, x = x_status, y = y_offset) | |
splash.append(label_scanning) | |
networks_s = wifi.radio.start_scanning_networks() | |
networks = [] | |
for net in networks_s: | |
networks.append([net.ssid, net.channel, net.rssi]) | |
networks.sort(key=lambda x: x[2], reverse=False) | |
print('Wi-Fi scan: Sorting') | |
net_labels = [] | |
y_offset += 10 | |
displayed_nets = [] | |
for net in networks: | |
if net[0] == '': continue ## ignore hidden nets | |
if net[0] in displayed_nets: continue ## ignore duplicates (CP bug #7318) | |
else: displayed_nets.append(net[0]) | |
if len(displayed_nets) >= 4: break ## limit net count | |
label_net = label.Label(terminalio.FONT, text = net[0], color = 0xFFFFFF, x = 10, y = y_offset) | |
net_labels.append(label_net) | |
splash.append(label_net) | |
y_offset += 10 | |
hidden = len(networks) - len(net_labels) | |
if hidden > 0: | |
label_net = label.Label(terminalio.FONT, text = '+' + str(hidden) + ' other networks', color = 0xFFFFFF, x = 10, y = y_offset) | |
net_labels.append(label_net) | |
splash.append(label_net) | |
print('Wi-Fi scan: Waiting') | |
try: splash.remove(label_scanning) | |
except: pass | |
label_continue_new = None | |
for i in range(3): | |
label_continue_new = label.Label(terminalio.FONT, text = '[Wait ' + str(3-i) + ']', color = 0xFFFFFF, x = x_status, y = y_offset_original) | |
splash.append(label_continue_new) | |
try: splash.remove(label_continue) | |
except: pass | |
label_continue = label_continue_new | |
del label_continue_new | |
time.sleep(1) | |
for lbl in net_labels: | |
try: splash.remove(lbl) | |
except: pass | |
try: splash.remove(label_continue) | |
except: pass | |
print('Wi-Fi scan: Done!') | |
## connect to wifi network | |
def connect_wifi(splash, x_status, y_offset, y_secret, label_secret_source, ssid, password): | |
global wifi_connected | |
if ssid == None or password == None: | |
print('Wi-Fi connect: No secrets available') | |
if splash != None: | |
label_connect = label.Label(terminalio.FONT, text = '[NO SECRETS]', color = 0xFFFFFF, x = x_status, y = y_offset) | |
splash.append(label_connect) | |
return | |
for i in range(5): | |
print('Wi-Fi connect: Connecting') | |
if splash != None: | |
label_connect = label.Label(terminalio.FONT, text = '[ATTEMPT ' + str(i+1) + '/5]', color = 0xFFFFFF, x = (x_status-30), y = y_offset) | |
splash.append(label_connect) | |
label_ssid = label.Label(terminalio.FONT, text = 'SSID: ' + ssid, color = 0xFFFFFF, x = 10, y = y_offset + 10) | |
splash.append(label_ssid) | |
try: | |
wifi.radio.connect(ssid=ssid, password=password) | |
try: splash.remove(label_connect) | |
except: pass | |
print('Wi-Fi connect: Connected') | |
init_udp() | |
if splash != None: | |
label_connect = label.Label(terminalio.FONT, text = '[OK]', color = 0xFFFFFF, x = (x_status-20), y = y_offset) | |
splash.append(label_connect) | |
print("\tIP:", wifi.radio.ipv4_address) | |
print("\tNetmask:", wifi.radio.ipv4_subnet) | |
if splash != None: | |
splash.append(label.Label(terminalio.FONT, text = 'IP: ' + str(wifi.radio.ipv4_address), color = 0xFFFFFF, x = 10, y = y_offset + 20)) | |
splash.append(label.Label(terminalio.FONT, text = 'NM: ' + str(wifi.radio.ipv4_subnet), color = 0xFFFFFF, x = 10, y = y_offset + 30)) | |
wifi_connected = True | |
break | |
except: | |
wifi_connected = False | |
try: | |
splash.remove(label_connect) | |
splash.remove(label_ssid) | |
except: pass | |
print('Wi-Fi connect: Connection failed') | |
if splash != None: | |
label_connect = label.Label(terminalio.FONT, text = '[FAIL]', color = 0xFFFFFF, x = (x_status-20), y = y_offset) | |
splash.append(label_connect) | |
if splash == None or i == 4: return | |
time.sleep(0.5) | |
for i in range(0, 3): | |
lc_old = label_connect | |
print('\tRetrying in ' + str(3-i) + '..') | |
label_connect = label.Label(terminalio.FONT, text = '[RETRY ' + str(3-i) + ']', color = 0xFFFFFF, x = (x_status - 20), y = y_offset) | |
try: splash.remove(lc_old) | |
except: pass | |
splash.append(label_connect) | |
time.sleep(1) | |
try: | |
splash.remove(label_secret_source) | |
splash.remove(label_connect) | |
except: pass | |
if get_button() == 'blue': ## pressed blue | |
print('\tForced backup secrets due to button press') | |
label_secret_source = label.Label(terminalio.FONT, text = '[BACKUP]', color = 0xFFFFFF, x = x_status, y = y_secret) | |
try: | |
from secrets import backup as secrets | |
ssid = secrets['ssid'] | |
password = secrets['password'] | |
except: | |
print('Backup secrets missing') | |
return | |
else: | |
print('\tRetrying with main secrets') | |
label_secret_source = label.Label(terminalio.FONT, text = '[MAIN]', color = 0xFFFF, x = x_status, y = y_secret) | |
splash.append(label_secret_source) | |
print('Wi-Fi connect: Done!') | |
def beep(length = 0.1, repeat = 1): | |
global p_buzz | |
for i in range(repeat): | |
p_buzz.value = True | |
time.sleep(length) | |
p_buzz.value = False | |
if repeat > 1: time.sleep(length) | |
## get button state | |
def get_button(): | |
global p_button | |
if p_button.value > THRESHOLD_RED: | |
beep() | |
return 'red' | |
elif p_button.value < THRESHOLD_BLUE: | |
beep() | |
return 'blue' | |
else: | |
return str(p_button.value) | |
def init_secret(): | |
bp = get_button() | |
secret_source = None | |
## blue button forces backup secrets | |
if bp == 'blue': | |
print('Secrets: Forced backup secrets due to button press') | |
try: | |
from secrets import backup as secrets | |
secret_source = 'Backup' | |
except: | |
pass | |
## otherwise try main secrets first | |
else: | |
print('Secrets: Looking for main secrets') | |
try: | |
from secrets import secrets | |
secret_source = 'Main' | |
except: | |
try: | |
print('Secrets: Main secret failed. Trying backup secrets') | |
from secrets import backup as secrets | |
secret_source = 'Primary' | |
except: | |
pass | |
if secret_source == None: | |
print('Secrets: No valid secrets found') | |
else: | |
try: | |
return secret_source, secrets['ssid'], secrets['password'] | |
except: | |
print('Secrets: Found secrets file, but format was invalid. Must have ssid and password keys') | |
print('Secrets: Returning blank data') | |
return None, None, None | |
def init_system(): | |
global PIN_SPI_CLK, PIN_SPI_MOSI | |
global ssid, password, secret_source | |
global PIN_LCD_DC, PIN_LCD_CS, PIN_LCD_RESET, LCD_WIDTH, LCD_HEIGHT, LCD_ROTATION, splash | |
global p_ntc, PIN_NTC, CALIBRATION_INTERNAL_NTC_A_RESISTANCE, CALIBRATION_INTERNAL_NTC_A_CELSIUS, CALIBRATION_INTERNAL_NTC_B_RESISTANCE, CALIBRATION_INTERNAL_NTC_B_CELSIUS, CALIBRATION_INTERNAL_NTC_C_RESISTANCE, CALIBRATION_INTERNAL_NTC_C_CELSIUS, CALIBRATION_NTC_DIVIDER, CALIBRATION_NTC_OFFSET | |
global PIN_UART_TX, PIN_UART_RX, UART_BAUD | |
global uart | |
print('Init: Release hardware') | |
try: | |
displayio.release_displays() | |
w_radio.stop_scanning_networks() | |
except: pass | |
## SPI init | |
print('Init: SPI') | |
spi = busio.SPI(PIN_SPI_CLK, MOSI = PIN_SPI_MOSI) | |
## Display init | |
print('Init: Display bus') | |
display_bus = displayio.FourWire(spi, command = PIN_LCD_DC, chip_select = PIN_LCD_CS, reset = PIN_LCD_RESET, baudrate = 1000000) | |
print('Init: Display parameters') | |
display = SPI_SCREEN(display_bus, width = LCD_WIDTH, height = LCD_HEIGHT, rotation = LCD_ROTATION) | |
print('Init: Display rendering') | |
splash = displayio.Group() | |
display.show(splash) | |
## status messages | |
x_status = 80 | |
print('Init: Display messages') | |
splash.append(label.Label(terminalio.FONT, text = 'Display', color = 0xFFFFFF, x = 5, y = 10)) | |
splash.append(label.Label(terminalio.FONT, text = '[OK]', color = 0x00FF00, x = x_status, y = 10)) | |
## configure external NTC thermistor | |
print('Init: NTC external') | |
splash.append(label.Label(terminalio.FONT, text = 'External NTC', color = 0xFFFFFF, x = 5, y = 20)) | |
p_ntc = NTC(PIN_NTC, CALIBRATION_NTC_A_RESISTANCE, CALIBRATION_NTC_A_CELSIUS, CALIBRATION_NTC_B_RESISTANCE, CALIBRATION_NTC_B_CELSIUS, CALIBRATION_NTC_C_RESISTANCE, CALIBRATION_NTC_C_CELSIUS, CALIBRATION_NTC_DIVIDER, CALIBRATION_NTC_OFFSET) | |
temp = p_ntc.temperature() | |
print('\tTemperature: ' + str(temp) + 'c') | |
if (temp < TEMPERATURE_SANE_MIN) or (temp > TEMPERATURE_SANE_MAX): | |
splash.append(label.Label(terminalio.FONT, text = '!' + str(int(temp)) + 'c !', color = 0xFFFFFF, x = x_status, y = 20)) | |
print('\t\tValue not sane!') | |
else: | |
splash.append(label.Label(terminalio.FONT, text = '[' + str(int(temp)) + 'c]', color = 0xFFFFFF, x = x_status, y = 20)) | |
print('\t\tValue ok') | |
## UART | |
splash.append(label.Label(terminalio.FONT, text = 'UART', color = 0xFFFFFF, x = 5, y = 30)) | |
try: | |
uart = busio.UART(PIN_UART_TX, PIN_UART_RX, baudrate=UART_BAUD, timeout = 0.25) | |
splash.append(label.Label(terminalio.FONT, text = '[OK]', color = 0xFFFFFF, x = x_status, y = 30)) | |
except: | |
splash.append(label.Label(terminalio.FONT, text = '[FAIL]', color = 0xFFFFFF, x = x_status, y = 30)) | |
## set initial compressor speed | |
splash.append(label.Label(terminalio.FONT, text = 'Compressor', color = 0xFFFFFF, x = 5, y = 40)) | |
if (temp - set_temperature) < 2: | |
set_compressor_speed(0) | |
splash.append(label.Label(terminalio.FONT, text = '[LOW]', color = 0x0000FF, x = x_status, y = 40)) | |
elif (temp - set_temperature) < 4: | |
set_compressor_speed(1) | |
splash.append(label.Label(terminalio.FONT, text = '[MID]', color = 0x00FF00, x = x_status, y = 40)) | |
else: | |
set_compressor_speed(2) | |
splash.append(label.Label(terminalio.FONT, text = '[MAX]', color = 0xFF0000, x = x_status, y = 40)) | |
## new screen | |
time.sleep(1) | |
splash_old = splash | |
del splash | |
splash = displayio.Group() | |
display.show(splash) | |
## get secrets | |
splash.append(label.Label(terminalio.FONT, text = 'Secrets', color = 0xFFFFFF, x = 5, y = 5)) | |
secret_source, ssid, password = init_secret() | |
if secret_source is None: | |
splash.append(label.Label(terminalio.FONT, text = '[FAILED]', color = 0xFFFFFF, x = x_status, y = 5)) | |
return | |
else: | |
label_secret_source = label.Label(terminalio.FONT, text = '[' + secret_source + ']', color = 0xFFFFFF, x = x_status, y = 5) | |
splash.append(label_secret_source) | |
## wireless scan | |
print('Init: Wi-Fi scan') | |
scan_wifi(splash, x_status, 15) | |
## wireless connect | |
print('Init: Wi-Fi connect') | |
connect_wifi(splash, x_status, 15, 5, label_secret_source, ssid, password) | |
if wifi_connected == False: | |
print('Gave up trying to connect to Wi-Fi. Will try again later') | |
splash.append(label.Label(terminalio.FONT, text = 'Will retry later', color = 0xFFFFFF, x = 5, y = 35)) | |
## countdown to start | |
label_countdown = None | |
print('Init: Start delay') | |
for i in range(3): | |
print('\t' + str(3-i) + '...') | |
label_countdown_new = label.Label(terminalio.FONT, text = 'Starting in ' + str(3-i) + 's..', color = 0xFFFFFF, x = 5, y = LCD_HEIGHT - 5) | |
try: splash.remove(label_countdown) | |
except: pass | |
splash.append(label_countdown_new) | |
label_countdown = label_countdown_new | |
time.sleep(1) | |
## new screen | |
display.root_group = None | |
del splash | |
splash = displayio.Group() | |
display.show(splash) | |
print('Init: Completed!') | |
def animate_snowflake(frame, x, y): | |
frame_content = displayio.Group() | |
x1, y1 = random.randint(x, x + 12), y + (frame * 3) + random.randint(1,3) | |
x2, y2 = random.randint(x + 10, x + 20), y + (frame * 3) + random.randint(1,4) | |
x3, y3 = random.randint(x + 18, x + 25), y + (frame * 3) + random.randint(1,3) | |
frame_content.append(label.Label(terminalio.FONT, text = 'X', x = x1, y = y1, color = 0xFFFFFF)) | |
frame_content.append(label.Label(terminalio.FONT, text = '+', x = x1, y = y1, color = 0xFFFFFF)) | |
frame_content.append(label.Label(terminalio.FONT, text = 'X', x = x2, y = y2, color = 0xFFFFFF)) | |
frame_content.append(label.Label(terminalio.FONT, text = '+', x = x2, y = y2, color = 0xFFFFFF)) | |
frame_content.append(label.Label(terminalio.FONT, text = 'X', x = x3, y = y3, color = 0xFFFFFF)) | |
frame_content.append(label.Label(terminalio.FONT, text = '+', x = x3, y = y3, color = 0xFFFFFF)) | |
return frame_content | |
def ipStrToBin(addrStr): | |
addrBin = 0 | |
for octet in addrStr.split("."): | |
addrBin = addrBin << 8 | int(octet) | |
return addrBin | |
def ipBinToStr(addrBin): | |
octetsStr = [] | |
for i in range(4): | |
octetsStr.append(str(addrBin>>(8*(3-i))&255)) | |
return ".".join(octetsStr) | |
def init_udp(): | |
global udp_host, udp_port, sock | |
print('UDP sender: Initialising') | |
## set up UDP sender | |
pool = socketpool.SocketPool(wifi.radio) | |
ipAddressBin = ipStrToBin(str(wifi.radio.ipv4_address)) | |
subnetMaskBin = ipStrToBin(str(wifi.radio.ipv4_subnet)) | |
networkAddressBin = ipAddressBin & subnetMaskBin | |
broadcastAddressBin = ipAddressBin | (subnetMaskBin ^ (1<<32)-1) | |
networkAddressStr = ipBinToStr(networkAddressBin) | |
udp_host = ipBinToStr(broadcastAddressBin) | |
udp_port = 5005 | |
sock = pool.socket(pool.AF_INET, pool.SOCK_DGRAM) # UDP, and we'l reuse it each time | |
sock.settimeout(2) | |
sock.setblocking(False) | |
print('UDP sender: Ready') | |
def send_message(message): | |
global sock, udp_host, udp_port | |
if wifi.radio.connected == False: | |
print('\tNot sending message. We are offline') | |
return | |
try: | |
b_message = bytes(str(message), 'utf-8') | |
sock.sendto(b_message, (udp_host,udp_port) ) # send UDP packet to udp_host:port | |
print('\tMessage sent') | |
except: | |
print('\tMessage send failed') | |
init_system() | |
old_screen = None | |
last_button = 0 | |
animate_frame = 0 | |
last_wifi_try = time.time() | |
temp_change = False | |
last_temp_change = time.time() | |
last_udp_send = 0 | |
boost_compressor = 0 | |
print('Starting event loop') | |
while True: | |
now = time.time() | |
## change temp on button presses | |
if (now - last_button) > REPEAT_BUTTON: | |
button_press = get_button() | |
if button_press == 'blue': | |
set_temperature += -1 | |
print('Decreasing temperature') | |
#temp_change = True | |
elif button_press == 'red': | |
set_temperature += 1 | |
print('Increasing temperature') | |
#temp_change = True | |
#if temp_change == True and (now - last_temp_change) > 3: | |
# last_temp_change = now | |
# temp_change = False | |
# | |
# try: | |
# storage.remount("/", readonly=False) | |
# with open('temperature', 'w') as fp: | |
# fp.write(str(set_temperature)) | |
# print('Saved temperature ' + str(set_temperature) + 'c to filesystem') | |
# except: | |
# print('Could not save temperature to filesystem') | |
# raise | |
# finally: | |
# storage.remount("/", readonly=True) | |
## keep temperature within allowed values | |
temperature_warning_elements = [] | |
if set_temperature < TEMPERATURE_MIN: | |
set_temperature = TEMPERATURE_MIN | |
temperature_warning_elements.append(rect.Rect(2, 1, LCD_WIDTH-2, 35, fill = 0x000000, outline = 0xFFFFFF)) | |
temperature_warning_elements.append(label.Label(terminalio.FONT, text = 'Minimum temperature\nset!', x = 5, y = 10, color = 0xFFFFFF)) | |
elif set_temperature > TEMPERATURE_MAX: | |
set_temperature = TEMPERATURE_MAX | |
temperature_warning_elements.append(rect.Rect(2, 1, LCD_WIDTH-2, 35, fill = 0x000000, outline = 0xFFFFFF)) | |
temperature_warning_elements.append(label.Label(terminalio.FONT, text = 'Maximum temperature\nset!', x = 5, y = 10, color = 0xFFFFFF)) | |
if len(temperature_warning_elements) > 0: | |
print('Temperature setting limit reached') | |
for e in temperature_warning_elements: | |
splash.append(e) | |
beep(0.25, 2) | |
time.sleep(1) | |
for e in temperature_warning_elements: | |
try: splash.remove(e) | |
except: pass | |
if wifi_connected == False and (now - last_wifi_try) > 30: | |
reconnect_elements = displayio.Group() | |
reconnect_elements.append(rect.Rect(2, 1, LCD_WIDTH-2, 35, fill = 0x000000, outline = 0xFFFFFF)) | |
reconnect_elements.append(label.Label(terminalio.FONT, text = 'Retrying Wi-Fi\nconnection...', x = 5, y = 10, color = 0xFFFFFF)) | |
splash.append(reconnect_elements) | |
connect_wifi(None, 0, 0, 0, None, ssid, password) | |
try: splash.remove(reconnect_elements) | |
except: pass | |
reconnect_elements = displayio.Group() | |
reconnect_elements.append(rect.Rect(2, 1, LCD_WIDTH-2, 35, fill = 0x000000, outline = 0xFFFFFF)) | |
if wifi_connected is True: | |
reconnect_elements.append(label.Label(terminalio.FONT, text = 'Connection ok\nIP ' + str(wifi.radio.ipv4_address), x = 5, y = 10, color = 0xFFFFFF)) | |
else: | |
reconnect_elements.append(label.Label(terminalio.FONT, text = 'Connection fail\nRetry in 30s..', x = 5, y = 10, color = 0xFFFFFF)) | |
splash.append(reconnect_elements) | |
last_wifi_try = now | |
time.sleep(1) | |
try: splash.remove(reconnect_elements) | |
except: pass | |
fc = check_error() | |
## get the temp, but do compressor speed based on 1min average | |
temp = int(p_ntc.temperature()) | |
if (now - temp_avg_last) > 5: | |
temp_avg_list[now] = temp | |
temp_avg_last = now | |
for t in temp_avg_list.keys(): | |
if (now - t) > 60: | |
del temp_avg_list[t] | |
avg_temp_list = [] | |
for ts in temp_avg_list.keys(): | |
avg_temp_list.append(temp_avg_list[ts]) | |
avg_temp = int(sum(avg_temp_list) / len(avg_temp_list)) | |
if (temp > avg_temp) and (compressor_speed != 0) and ((now - compressor_speed_last_set) > 30): | |
print('Temperature is increasing while compressor is running. Suspect high thermal load. Boosting compressor speed for 5mins.') | |
boost_compressor = now + 300 | |
## if the temperature is changing too fast, the sensor may be faulty | |
if (temp - avg_temp) > TEMPERATURE_RATE or (avg_temp - temp) > TEMPERATURE_RATE: | |
print('Is the fridge closed? Temperature average is ' + str(avg_temp) + 'c but last reading was ' + str(temp) + 'c. Max allowed deviation is ' + str(TEMPERATURE_RATE) + 'c.') | |
fc = 10 | |
## if the temperature is outside sane values, report a fault too | |
if (temp < TEMPERATURE_SANE_MIN) or (temp > TEMPERATURE_SANE_MAX): | |
print('Suspect temperature probe fault. Last reading was ' + str(temp) + 'c, but sane values are ' + str(TEMPERATURE_SANE_MIN) + 'c to ' + str(TEMPERATURE_SANE_MAX) + 'c.') | |
fc = 11 | |
## if the compressor controller (arduino) hasn't responded in the last | |
## 10sec, report an error | |
if (now - last_compressor_comms) > 10: | |
print('Compressor/fault controller is unresponsive. Check connections and controller.') | |
fc = 12 | |
print('** Temperature: ' + str(temp) + 'c, average: ' + str(avg_temp) + 'c. Status code ' + str(fc) + '. Compressor speed ' + str(compressor_speed)) | |
if fc != 0: | |
ew = show_error(fc) | |
splash.append(ew) | |
time.sleep(2) | |
try: splash.remove(ew) | |
except: pass | |
if (avg_temp - set_temperature) < 2: | |
if now < boost_compressor: is_holding = set_compressor_speed(0) | |
else: is_holding = set_compressor_speed(1) | |
elif (avg_temp - set_temperature) < 4: | |
if now < boost_compressor: is_holding = set_compressor_speed(1) | |
else: is_holding = set_compressor_speed(2) | |
else: | |
is_holding = set_compressor_speed(2) | |
## if connected, send udp data to display head every 5sec | |
if wifi_connected and (now - last_udp_send) >= 5: | |
last_udp_send = now | |
message = '#10|5^20^image^snow|' | |
## current temperature | |
#message += '65^20^text:A3FCFF:None^Currently ' + str(temp) + 'c|' | |
## current temp using 7seg style digits | |
d_offset_x = 40 | |
for d in str(temp): | |
if d == '-': | |
dig = 'minus' | |
d_offset_y = 20 | |
else: | |
dig = 'd' + str(d) | |
d_offset_y = 5 | |
message += str(d_offset_x) + '^' + str(d_offset_y) + '^image^' + dig + '|' | |
d_offset_x += 25 | |
message += str(d_offset_x) + '^30^image^dc|' | |
## set temperature | |
message += '40^75^text:FFFFFF:None^Setpoint ' + str(set_temperature) + 'c|' | |
## compressor | |
if compressor_speed == 0: | |
message += '5^90^text:FFCD78:None^Compressor: Off' | |
elif compressor_speed == 1: | |
message += '5^90^text:FFCD78:None^Compressor: Low' | |
elif compressor_speed == 2: | |
message += '5^90^text:FFCD78:None^Compressor: High' | |
compressor_special = [] | |
if is_holding == True: compressor_special.append('H') | |
if now < boost_compressor: compressor_special.append('B') | |
if len(compressor_special) > 0: | |
message += ' (' + ','.join(compressor_special) + ')' | |
message += '|' | |
if fc == 0: | |
message += '5^105^text:00FF00:None^No faults reported#' | |
elif fc == 1: | |
message += '5^105^text:FF0000:None^Fault:\nBattery low#' | |
elif fc == 2: | |
message += '5^105text:FF0000:None^Fault:\nFan over-current#' | |
elif fc == 3: | |
message += '5^105^text:FF0000:None^Fault:\nRefrigerant blockage#' | |
elif fc == 4: | |
message += '5^105^text:FF0000:None^Fault:\nCompressor overload#' | |
elif fc == 5: | |
message += '5^105^text:FF0000:None^Fault:\nCompressor overheated#' | |
elif fc == 10: | |
message += '5^105^text:FF0000:None^Fault:\nFast temperature change#' | |
elif fc == 11: | |
message += '5^105^text:FF0000:None^Fault:\nImpossible temperature#' | |
elif fc == 12: | |
message += '5^105^text:FF0000:None^Fault:\nController unresponsive#' | |
else: | |
message += '5^105^text:FF0000:None^Fault:\nFault code ' + str(fc) + '#' | |
print('Sending UDP status packet [' + message + ']') | |
send_message(message) | |
new_screen = screen_current_temp(temp, set_temperature, is_holding) | |
new_screen.append(animate_snowflake(animate_frame, 100, 5)) | |
if wifi_connected == False: new_screen.append(label.Label(terminalio.FONT, text = 'Offline', x = 80, y = 10, color = 0xFFFFFF)) | |
elif secret_source == 'Backup': new_screen.append(label.Label(terminalio.FONT, text = 'Backup NW', x = 70, y = 10, color = 0xFFFFFF)) | |
if now < boost_compressor: new_screen.append(label.Label(terminalio.FONT, text = 'Boost', x = 80, y = 20, color = 0xFFFFFF)) | |
try: splash.append(new_screen) | |
except Exception as e: | |
print('**Screen draw failed:') | |
traceback.print_exception() | |
try: splash.remove(old_screen) | |
except: pass | |
del old_screen | |
old_screen = new_screen | |
animate_frame += 1 | |
if animate_frame > 17: animate_frame = 0 | |
gc.collect() | |
except Exception as err: | |
print('!!!************************!!!') | |
print('!!! An exception occurred !!!') | |
print('!!!************************!!!') | |
traceback.print_exception(err) | |
try: | |
storage.remount("/", readonly=False) | |
with open('last_error', 'w') as fp: | |
traceback.print_exception(err, file = fp) | |
storage.remount("/", readonly=True) | |
print('Wrote exception information to last_error') | |
except: | |
print('Unable to write exception information to last_error') | |
print('Restarting to recover from exception..') | |
for i in range(5): | |
print('\t' + str(5-i) + '...') | |
time.sleep(1) | |
supervisor.reload() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
still crashes out and gets in a boot loop (until wifi connect) after a few hours as of rev15