Skip to content

Instantly share code, notes, and snippets.

@toxicantidote
Last active August 5, 2024 12:17
Show Gist options
  • Save toxicantidote/0d4f10288d91943c4ac1196b3c2ad667 to your computer and use it in GitHub Desktop.
Save toxicantidote/0d4f10288d91943c4ac1196b3c2ad667 to your computer and use it in GitHub Desktop.
Fridge GUI/control
import board
from micropython import const as S
## Screen parameters
LCD_WIDTH = S(130)
LCD_HEIGHT = S(64)
LCD_ROTATION = S(0)
## Button thresholds
THRESHOLD_BLUE = S(8000)
THRESHOLD_RED = S(50000)
## Button repeat rate (sec)
REPEAT_BUTTON = S(0.25)
## Minimum/maximum allowed settable temperature
TEMPERATURE_MIN = S(-20)
TEMPERATURE_MAX = S(10)
## Minimum and maximum tempertuares considered 'sane' values. Values outside of
## this range will be rejected as invalid
TEMPERATURE_SANE_MIN = S(-30)
TEMPERATURE_SANE_MAX = S(60)
## If the temperature differs from the 5min average by more than this,
## consider an NTC fault
TEMPERATURE_RATE = S(10)
## SPI pins
PIN_SPI_MOSI = S(board.GP19)
PIN_SPI_CLK = S(board.GP18)
## Screen pins
PIN_LCD_CS = S(board.GP22)
PIN_LCD_DC = S(board.GP21)
PIN_LCD_RESET = S(board.GP20)
## NTC pin
PIN_NTC = S(board.GP27)
## DS18x20 pin
PIN_TEMP_PROBE = S(board.GP1)
## Buzzer pin
PIN_BUZZ = S(board.GP6)
## Buttons input
PIN_BUTTON = S(board.GP26)
## UART to fridge fault/compressor board (Arduino)
PIN_UART_TX = S(board.GP12)
PIN_UART_RX = S(board.GP13)
UART_BAUD = S(9600)
## NTC thermistor resistance and temperatuire pairs (ohms/celsius)
CALIBRATION_NTC_A_RESISTANCE = S(24700)
CALIBRATION_NTC_A_CELSIUS = S(8)
CALIBRATION_NTC_B_RESISTANCE = S(10000)
CALIBRATION_NTC_B_CELSIUS = S(25)
CALIBRATION_NTC_C_RESISTANCE = S(1003)
CALIBRATION_NTC_C_CELSIUS = S(91)
## value of the other resistor in the voltage divider with the NTC
CALIBRATION_NTC_DIVIDER = S(67200)
## offset in degrees for the NTC value
CALIBRATION_NTC_OFFSET = S(0)
###
from adafruit_displayio_sh1106 import SH1106 as SPI_SCREEN
from adafruit_display_text import bitmap_label as label
from adafruit_display_shapes import rect
import displayio
import terminalio
import wifi
import math
import analogio
import time
import gc
import random
import digitalio
import socketpool
import supervisor
import busio
import re
import traceback
try:
#try:
# with open('temperature', 'r') as fp:
# set_temperature = int(fp.read())
# print('Using saved temperature ' + str(set_temperature) + 'c')
#except:
# set_temperature = 3
# print('Using default temperature ' + str(set_temperature) + 'c')
set_temperature = 3
secret_source = None
compressor_speed = 0
compressor_speed_last_set = 0
wifi_connected = False
splash = None
p_ntc = None
ssid = password = ''
sock = udp_host = udp_port = None
last_compressor_comms = time.time()
temp_avg_last = 0
temp_avg_list = dict()
error_code = 0
error_time = 0
p_button = analogio.AnalogIn(PIN_BUTTON)
p_buzz = digitalio.DigitalInOut(PIN_BUZZ)
p_buzz.direction = digitalio.Direction.OUTPUT
p_buzz.value = False
uart = None
## we give NTC's their own class so we can set properties on them. this is
## mostly just a wrapper around an analogio.AnalogIn object
class NTC():
def __init__(self, port, a_r, a_c, b_r, b_c, c_r, c_c, divider, offset):
self.ain = analogio.AnalogIn(port)
self.divider_resistor = divider
self.offset = offset
self.coefficient_a, self.coefficient_b, self.coefficient_c = self.calculate_coefficient(a_r, a_c, b_r, b_c, c_r, c_c)
@property
def value(self):
return self.ain.value
@property
def reference_voltage(self):
return self.ain.reference_voltage
def temperature(self):
samples = []
for i in range(5):
samples.append(self._temperature_single())
#time.sleep(0.1)
return (sum(samples)/len(samples))
def _temperature_single(self):
## work out the resistance of the thermistor
volt = self.reference_voltage - (self.reference_voltage * (self.ain.value / 65535))
current = (self.reference_voltage - volt) / self.divider_resistor
r_ntc = volt / current
## work out the temperatures of the thermistor in degrees
temp = (1/(self.coefficient_a+self.coefficient_b*math.log(r_ntc)+self.coefficient_c*math.pow(math.log(r_ntc),3))-273.15) + self.offset
return temp
def calculate_coefficient(self, a_r, a_c, b_r, b_c, c_r, c_c):
## convert to kelvin
a_k = a_c + 273.15
b_k = b_c + 273.15
c_k = c_c + 273.15
## calculate coefficients
c = ((1/a_k-1/b_k)-(math.log(a_r)- math.log(b_r))*(1/a_k-1/c_k)/(math.log(a_r)-math.log(c_r)))/((math.pow(math.log(a_r),3)-math.pow(math.log(b_r),3)) - (math.log(a_r)-math.log(b_r))*(math.pow(math.log(a_r),3)-math.pow(math.log(c_r),3))/(math.log(a_r)-math.log(c_r)))
b = ((1/a_k-1/b_k)-c*(math.pow(math.log(a_r),3)-math.pow(math.log(b_r),3)))/(math.log(a_r)-math.log(b_r))
a = 1/a_k-c*(math.log(a_r))*(math.log(a_r))*(math.log(a_r))-b*math.log(a_r)
return a, b, c
def screen_current_temp(current, target, compressor_is_holding):
global compressor_speed
screen_ct = displayio.Group()
screen_ct.append(label.Label(terminalio.FONT, text = str(current) + 'c', scale = 4, x = 20, y = 20))
screen_ct.append(label.Label(terminalio.FONT, text = 'Set ' + str(target) + 'c', scale = 2, x = 5, y = 45))
if compressor_is_holding == True: hold_text = ' (H)'
else: hold_text = ''
if compressor_speed == 0: screen_ct.append(label.Label(terminalio.FONT, text = 'Compressor: Off' + hold_text, x = 5, y = 60))
elif compressor_speed == 1: screen_ct.append(label.Label(terminalio.FONT, text = 'Compressor: Slow' + hold_text, x = 5, y = 60))
elif compressor_speed == 2: screen_ct.append(label.Label(terminalio.FONT, text = 'Compressor: Fast' + hold_text, x = 5, y = 60))
return screen_ct
def set_compressor_speed(speed = 0):
#global p_relay_c1, p_relay_c2
global uart
global compressor_speed, compressor_speed_last_set
## Speeds:
## 0 = off
## 1 = low speed (2000rpm)
## 2 = high speed (3500rpm)
if speed < 0 or speed > 2:
print('Invalid compressor speed')
return False
if speed != compressor_speed:
if (time.time() - compressor_speed_last_set) < 60:
print('Ignored compressor speed set. Too soon!')
return True
else:
compressor_speed_last_set = time.time()
compressor_speed = speed
if speed == 0:
#print('Compressor off')
uart.write(bytes("0", 'utf-8'))
elif speed == 1:
#print('Compressor low speed')
uart.write(bytes("1", 'utf-8'))
elif speed == 2:
#print('Compressor high speed')
uart.write(bytes("2", 'utf-8'))
return False
def check_error():
global uart, last_compressor_comms, error_code, error_time
try:
uart.write(bytes('S', 'utf-8'))
rx_data = re.sub('\n', '', uart.read(64).decode())
except:
print('UART comms exception')
return 12
if rx_data != '': last_compressor_comms = time.time()
re_error = re.search(r'STATUS (ERROR|OKAY)\s?(\d)?', rx_data)
if re_error:
error_status = re_error.group(1)
if error_status == 'OKAY': error_code = 0
else: error_code = re_error.group(2)
error_time = time.time()
return error_code
else:
## keep errors displayed for 2mins since last reported by compressor control
if (time.time() - error_time) < 120: return error_code
else: return 0
def show_error(flashes):
if flashes == 0: return
error_widgets = displayio.Group()
error_widgets.append(rect.Rect(2, 1, LCD_WIDTH-2, 35, fill = 0x000000, outline = 0xFFFFFF))
## compressor fault codes
if flashes == 1:
error_widgets.append(label.Label(terminalio.FONT, text = 'FAULT DETECTED!\nBattery low', x = 5, y = 10, color = 0xFFFFFF))
elif flashes == 2:
error_widgets.append(label.Label(terminalio.FONT, text = 'FAULT DETECTED!\nFan over-current', x = 5, y = 10, color = 0xFFFFFF))
elif flashes == 3:
error_widgets.append(label.Label(terminalio.FONT, text = 'FAULT DETECTED!\nRefrigerant block', x = 5, y = 10, color = 0xFFFFFF))
elif flashes == 4:
error_widgets.append(label.Label(terminalio.FONT, text = 'FAULT DETECTED!\nCompressor overload', x = 5, y = 10, color = 0xFFFFFF))
elif flashes == 5:
error_widgets.append(label.Label(terminalio.FONT, text = 'FAULT DETECTED!\nCompressor too hot', x = 5, y = 10, color = 0xFFFFFF))
## our fault codes
elif flashes == 10: ## temperature changed too fast
error_widgets.append(label.Label(terminalio.FONT, text = 'Rapid temperature\nchange, check probe.', x = 5, y = 10, color = 0xFFFFFF))
elif flashes == 11: ## temperature outside sane values
error_widgets.append(label.Label(terminalio.FONT, text = 'Impossible temperature\n check probe.', x = 5, y = 10, color = 0xFFFFFF))
elif flashes == 12: ## compressor controller (arduino) unresponsive
error_widgets.append(label.Label(terminalio.FONT, text = 'FAULT DETECTED!\nController offline', x = 5, y = 10, color = 0xFFFFFF))
## anything else
else:
error_widgets.append(label.Label(terminalio.FONT, text = 'FAULT DETECTED!\nFault code ' + str(flashes), x = 5, y = 10, color = 0xFFFFFF))
return error_widgets
## scans wireless networks
def scan_wifi(splash, x_status, y_offset):
print('Wi-Fi scan: Scanning')
y_offset_original = y_offset
splash.append(label.Label(terminalio.FONT, text = 'Wi-Fi', color = 0xFFFFFF, x = 5, y = y_offset))
label_scanning = label.Label(terminalio.FONT, text = '[SCAN]', color = 0xFFFFFF, x = x_status, y = y_offset)
splash.append(label_scanning)
networks_s = wifi.radio.start_scanning_networks()
networks = []
for net in networks_s:
networks.append([net.ssid, net.channel, net.rssi])
networks.sort(key=lambda x: x[2], reverse=False)
print('Wi-Fi scan: Sorting')
net_labels = []
y_offset += 10
displayed_nets = []
for net in networks:
if net[0] == '': continue ## ignore hidden nets
if net[0] in displayed_nets: continue ## ignore duplicates (CP bug #7318)
else: displayed_nets.append(net[0])
if len(displayed_nets) >= 4: break ## limit net count
label_net = label.Label(terminalio.FONT, text = net[0], color = 0xFFFFFF, x = 10, y = y_offset)
net_labels.append(label_net)
splash.append(label_net)
y_offset += 10
hidden = len(networks) - len(net_labels)
if hidden > 0:
label_net = label.Label(terminalio.FONT, text = '+' + str(hidden) + ' other networks', color = 0xFFFFFF, x = 10, y = y_offset)
net_labels.append(label_net)
splash.append(label_net)
print('Wi-Fi scan: Waiting')
try: splash.remove(label_scanning)
except: pass
label_continue_new = None
for i in range(3):
label_continue_new = label.Label(terminalio.FONT, text = '[Wait ' + str(3-i) + ']', color = 0xFFFFFF, x = x_status, y = y_offset_original)
splash.append(label_continue_new)
try: splash.remove(label_continue)
except: pass
label_continue = label_continue_new
del label_continue_new
time.sleep(1)
for lbl in net_labels:
try: splash.remove(lbl)
except: pass
try: splash.remove(label_continue)
except: pass
print('Wi-Fi scan: Done!')
## connect to wifi network
def connect_wifi(splash, x_status, y_offset, y_secret, label_secret_source, ssid, password):
global wifi_connected
if ssid == None or password == None:
print('Wi-Fi connect: No secrets available')
if splash != None:
label_connect = label.Label(terminalio.FONT, text = '[NO SECRETS]', color = 0xFFFFFF, x = x_status, y = y_offset)
splash.append(label_connect)
return
for i in range(5):
print('Wi-Fi connect: Connecting')
if splash != None:
label_connect = label.Label(terminalio.FONT, text = '[ATTEMPT ' + str(i+1) + '/5]', color = 0xFFFFFF, x = (x_status-30), y = y_offset)
splash.append(label_connect)
label_ssid = label.Label(terminalio.FONT, text = 'SSID: ' + ssid, color = 0xFFFFFF, x = 10, y = y_offset + 10)
splash.append(label_ssid)
try:
wifi.radio.connect(ssid=ssid, password=password)
try: splash.remove(label_connect)
except: pass
print('Wi-Fi connect: Connected')
init_udp()
if splash != None:
label_connect = label.Label(terminalio.FONT, text = '[OK]', color = 0xFFFFFF, x = (x_status-20), y = y_offset)
splash.append(label_connect)
print("\tIP:", wifi.radio.ipv4_address)
print("\tNetmask:", wifi.radio.ipv4_subnet)
if splash != None:
splash.append(label.Label(terminalio.FONT, text = 'IP: ' + str(wifi.radio.ipv4_address), color = 0xFFFFFF, x = 10, y = y_offset + 20))
splash.append(label.Label(terminalio.FONT, text = 'NM: ' + str(wifi.radio.ipv4_subnet), color = 0xFFFFFF, x = 10, y = y_offset + 30))
wifi_connected = True
break
except:
wifi_connected = False
try:
splash.remove(label_connect)
splash.remove(label_ssid)
except: pass
print('Wi-Fi connect: Connection failed')
if splash != None:
label_connect = label.Label(terminalio.FONT, text = '[FAIL]', color = 0xFFFFFF, x = (x_status-20), y = y_offset)
splash.append(label_connect)
if splash == None or i == 4: return
time.sleep(0.5)
for i in range(0, 3):
lc_old = label_connect
print('\tRetrying in ' + str(3-i) + '..')
label_connect = label.Label(terminalio.FONT, text = '[RETRY ' + str(3-i) + ']', color = 0xFFFFFF, x = (x_status - 20), y = y_offset)
try: splash.remove(lc_old)
except: pass
splash.append(label_connect)
time.sleep(1)
try:
splash.remove(label_secret_source)
splash.remove(label_connect)
except: pass
if get_button() == 'blue': ## pressed blue
print('\tForced backup secrets due to button press')
label_secret_source = label.Label(terminalio.FONT, text = '[BACKUP]', color = 0xFFFFFF, x = x_status, y = y_secret)
try:
from secrets import backup as secrets
ssid = secrets['ssid']
password = secrets['password']
except:
print('Backup secrets missing')
return
else:
print('\tRetrying with main secrets')
label_secret_source = label.Label(terminalio.FONT, text = '[MAIN]', color = 0xFFFF, x = x_status, y = y_secret)
splash.append(label_secret_source)
print('Wi-Fi connect: Done!')
def beep(length = 0.1, repeat = 1):
global p_buzz
for i in range(repeat):
p_buzz.value = True
time.sleep(length)
p_buzz.value = False
if repeat > 1: time.sleep(length)
## get button state
def get_button():
global p_button
if p_button.value > THRESHOLD_RED:
beep()
return 'red'
elif p_button.value < THRESHOLD_BLUE:
beep()
return 'blue'
else:
return str(p_button.value)
def init_secret():
bp = get_button()
secret_source = None
## blue button forces backup secrets
if bp == 'blue':
print('Secrets: Forced backup secrets due to button press')
try:
from secrets import backup as secrets
secret_source = 'Backup'
except:
pass
## otherwise try main secrets first
else:
print('Secrets: Looking for main secrets')
try:
from secrets import secrets
secret_source = 'Main'
except:
try:
print('Secrets: Main secret failed. Trying backup secrets')
from secrets import backup as secrets
secret_source = 'Primary'
except:
pass
if secret_source == None:
print('Secrets: No valid secrets found')
else:
try:
return secret_source, secrets['ssid'], secrets['password']
except:
print('Secrets: Found secrets file, but format was invalid. Must have ssid and password keys')
print('Secrets: Returning blank data')
return None, None, None
def init_system():
global PIN_SPI_CLK, PIN_SPI_MOSI
global ssid, password, secret_source
global PIN_LCD_DC, PIN_LCD_CS, PIN_LCD_RESET, LCD_WIDTH, LCD_HEIGHT, LCD_ROTATION, splash
global p_ntc, PIN_NTC, CALIBRATION_INTERNAL_NTC_A_RESISTANCE, CALIBRATION_INTERNAL_NTC_A_CELSIUS, CALIBRATION_INTERNAL_NTC_B_RESISTANCE, CALIBRATION_INTERNAL_NTC_B_CELSIUS, CALIBRATION_INTERNAL_NTC_C_RESISTANCE, CALIBRATION_INTERNAL_NTC_C_CELSIUS, CALIBRATION_NTC_DIVIDER, CALIBRATION_NTC_OFFSET
global PIN_UART_TX, PIN_UART_RX, UART_BAUD
global uart
print('Init: Release hardware')
try:
displayio.release_displays()
w_radio.stop_scanning_networks()
except: pass
## SPI init
print('Init: SPI')
spi = busio.SPI(PIN_SPI_CLK, MOSI = PIN_SPI_MOSI)
## Display init
print('Init: Display bus')
display_bus = displayio.FourWire(spi, command = PIN_LCD_DC, chip_select = PIN_LCD_CS, reset = PIN_LCD_RESET, baudrate = 1000000)
print('Init: Display parameters')
display = SPI_SCREEN(display_bus, width = LCD_WIDTH, height = LCD_HEIGHT, rotation = LCD_ROTATION)
print('Init: Display rendering')
splash = displayio.Group()
display.show(splash)
## status messages
x_status = 80
print('Init: Display messages')
splash.append(label.Label(terminalio.FONT, text = 'Display', color = 0xFFFFFF, x = 5, y = 10))
splash.append(label.Label(terminalio.FONT, text = '[OK]', color = 0x00FF00, x = x_status, y = 10))
## configure external NTC thermistor
print('Init: NTC external')
splash.append(label.Label(terminalio.FONT, text = 'External NTC', color = 0xFFFFFF, x = 5, y = 20))
p_ntc = NTC(PIN_NTC, CALIBRATION_NTC_A_RESISTANCE, CALIBRATION_NTC_A_CELSIUS, CALIBRATION_NTC_B_RESISTANCE, CALIBRATION_NTC_B_CELSIUS, CALIBRATION_NTC_C_RESISTANCE, CALIBRATION_NTC_C_CELSIUS, CALIBRATION_NTC_DIVIDER, CALIBRATION_NTC_OFFSET)
temp = p_ntc.temperature()
print('\tTemperature: ' + str(temp) + 'c')
if (temp < TEMPERATURE_SANE_MIN) or (temp > TEMPERATURE_SANE_MAX):
splash.append(label.Label(terminalio.FONT, text = '!' + str(int(temp)) + 'c !', color = 0xFFFFFF, x = x_status, y = 20))
print('\t\tValue not sane!')
else:
splash.append(label.Label(terminalio.FONT, text = '[' + str(int(temp)) + 'c]', color = 0xFFFFFF, x = x_status, y = 20))
print('\t\tValue ok')
## UART
splash.append(label.Label(terminalio.FONT, text = 'UART', color = 0xFFFFFF, x = 5, y = 30))
try:
uart = busio.UART(PIN_UART_TX, PIN_UART_RX, baudrate=UART_BAUD, timeout = 0.25)
splash.append(label.Label(terminalio.FONT, text = '[OK]', color = 0xFFFFFF, x = x_status, y = 30))
except:
splash.append(label.Label(terminalio.FONT, text = '[FAIL]', color = 0xFFFFFF, x = x_status, y = 30))
## set initial compressor speed
splash.append(label.Label(terminalio.FONT, text = 'Compressor', color = 0xFFFFFF, x = 5, y = 40))
if (temp - set_temperature) < 2:
set_compressor_speed(0)
splash.append(label.Label(terminalio.FONT, text = '[LOW]', color = 0x0000FF, x = x_status, y = 40))
elif (temp - set_temperature) < 4:
set_compressor_speed(1)
splash.append(label.Label(terminalio.FONT, text = '[MID]', color = 0x00FF00, x = x_status, y = 40))
else:
set_compressor_speed(2)
splash.append(label.Label(terminalio.FONT, text = '[MAX]', color = 0xFF0000, x = x_status, y = 40))
## new screen
time.sleep(1)
splash_old = splash
del splash
splash = displayio.Group()
display.show(splash)
## get secrets
splash.append(label.Label(terminalio.FONT, text = 'Secrets', color = 0xFFFFFF, x = 5, y = 5))
secret_source, ssid, password = init_secret()
if secret_source is None:
splash.append(label.Label(terminalio.FONT, text = '[FAILED]', color = 0xFFFFFF, x = x_status, y = 5))
return
else:
label_secret_source = label.Label(terminalio.FONT, text = '[' + secret_source + ']', color = 0xFFFFFF, x = x_status, y = 5)
splash.append(label_secret_source)
## wireless scan
print('Init: Wi-Fi scan')
scan_wifi(splash, x_status, 15)
## wireless connect
print('Init: Wi-Fi connect')
connect_wifi(splash, x_status, 15, 5, label_secret_source, ssid, password)
if wifi_connected == False:
print('Gave up trying to connect to Wi-Fi. Will try again later')
splash.append(label.Label(terminalio.FONT, text = 'Will retry later', color = 0xFFFFFF, x = 5, y = 35))
## countdown to start
label_countdown = None
print('Init: Start delay')
for i in range(3):
print('\t' + str(3-i) + '...')
label_countdown_new = label.Label(terminalio.FONT, text = 'Starting in ' + str(3-i) + 's..', color = 0xFFFFFF, x = 5, y = LCD_HEIGHT - 5)
try: splash.remove(label_countdown)
except: pass
splash.append(label_countdown_new)
label_countdown = label_countdown_new
time.sleep(1)
## new screen
display.root_group = None
del splash
splash = displayio.Group()
display.show(splash)
print('Init: Completed!')
def animate_snowflake(frame, x, y):
frame_content = displayio.Group()
x1, y1 = random.randint(x, x + 12), y + (frame * 3) + random.randint(1,3)
x2, y2 = random.randint(x + 10, x + 20), y + (frame * 3) + random.randint(1,4)
x3, y3 = random.randint(x + 18, x + 25), y + (frame * 3) + random.randint(1,3)
frame_content.append(label.Label(terminalio.FONT, text = 'X', x = x1, y = y1, color = 0xFFFFFF))
frame_content.append(label.Label(terminalio.FONT, text = '+', x = x1, y = y1, color = 0xFFFFFF))
frame_content.append(label.Label(terminalio.FONT, text = 'X', x = x2, y = y2, color = 0xFFFFFF))
frame_content.append(label.Label(terminalio.FONT, text = '+', x = x2, y = y2, color = 0xFFFFFF))
frame_content.append(label.Label(terminalio.FONT, text = 'X', x = x3, y = y3, color = 0xFFFFFF))
frame_content.append(label.Label(terminalio.FONT, text = '+', x = x3, y = y3, color = 0xFFFFFF))
return frame_content
def ipStrToBin(addrStr):
addrBin = 0
for octet in addrStr.split("."):
addrBin = addrBin << 8 | int(octet)
return addrBin
def ipBinToStr(addrBin):
octetsStr = []
for i in range(4):
octetsStr.append(str(addrBin>>(8*(3-i))&255))
return ".".join(octetsStr)
def init_udp():
global udp_host, udp_port, sock
print('UDP sender: Initialising')
## set up UDP sender
pool = socketpool.SocketPool(wifi.radio)
ipAddressBin = ipStrToBin(str(wifi.radio.ipv4_address))
subnetMaskBin = ipStrToBin(str(wifi.radio.ipv4_subnet))
networkAddressBin = ipAddressBin & subnetMaskBin
broadcastAddressBin = ipAddressBin | (subnetMaskBin ^ (1<<32)-1)
networkAddressStr = ipBinToStr(networkAddressBin)
udp_host = ipBinToStr(broadcastAddressBin)
udp_port = 5005
sock = pool.socket(pool.AF_INET, pool.SOCK_DGRAM) # UDP, and we'l reuse it each time
sock.settimeout(2)
sock.setblocking(False)
print('UDP sender: Ready')
def send_message(message):
global sock, udp_host, udp_port
if wifi.radio.connected == False:
print('\tNot sending message. We are offline')
return
try:
b_message = bytes(str(message), 'utf-8')
sock.sendto(b_message, (udp_host,udp_port) ) # send UDP packet to udp_host:port
print('\tMessage sent')
except:
print('\tMessage send failed')
init_system()
old_screen = None
last_button = 0
animate_frame = 0
last_wifi_try = time.time()
temp_change = False
last_temp_change = time.time()
last_udp_send = 0
boost_compressor = 0
print('Starting event loop')
while True:
now = time.time()
## change temp on button presses
if (now - last_button) > REPEAT_BUTTON:
button_press = get_button()
if button_press == 'blue':
set_temperature += -1
print('Decreasing temperature')
#temp_change = True
elif button_press == 'red':
set_temperature += 1
print('Increasing temperature')
#temp_change = True
#if temp_change == True and (now - last_temp_change) > 3:
# last_temp_change = now
# temp_change = False
#
# try:
# storage.remount("/", readonly=False)
# with open('temperature', 'w') as fp:
# fp.write(str(set_temperature))
# print('Saved temperature ' + str(set_temperature) + 'c to filesystem')
# except:
# print('Could not save temperature to filesystem')
# raise
# finally:
# storage.remount("/", readonly=True)
## keep temperature within allowed values
temperature_warning_elements = []
if set_temperature < TEMPERATURE_MIN:
set_temperature = TEMPERATURE_MIN
temperature_warning_elements.append(rect.Rect(2, 1, LCD_WIDTH-2, 35, fill = 0x000000, outline = 0xFFFFFF))
temperature_warning_elements.append(label.Label(terminalio.FONT, text = 'Minimum temperature\nset!', x = 5, y = 10, color = 0xFFFFFF))
elif set_temperature > TEMPERATURE_MAX:
set_temperature = TEMPERATURE_MAX
temperature_warning_elements.append(rect.Rect(2, 1, LCD_WIDTH-2, 35, fill = 0x000000, outline = 0xFFFFFF))
temperature_warning_elements.append(label.Label(terminalio.FONT, text = 'Maximum temperature\nset!', x = 5, y = 10, color = 0xFFFFFF))
if len(temperature_warning_elements) > 0:
print('Temperature setting limit reached')
for e in temperature_warning_elements:
splash.append(e)
beep(0.25, 2)
time.sleep(1)
for e in temperature_warning_elements:
try: splash.remove(e)
except: pass
if wifi_connected == False and (now - last_wifi_try) > 30:
reconnect_elements = displayio.Group()
reconnect_elements.append(rect.Rect(2, 1, LCD_WIDTH-2, 35, fill = 0x000000, outline = 0xFFFFFF))
reconnect_elements.append(label.Label(terminalio.FONT, text = 'Retrying Wi-Fi\nconnection...', x = 5, y = 10, color = 0xFFFFFF))
splash.append(reconnect_elements)
connect_wifi(None, 0, 0, 0, None, ssid, password)
try: splash.remove(reconnect_elements)
except: pass
reconnect_elements = displayio.Group()
reconnect_elements.append(rect.Rect(2, 1, LCD_WIDTH-2, 35, fill = 0x000000, outline = 0xFFFFFF))
if wifi_connected is True:
reconnect_elements.append(label.Label(terminalio.FONT, text = 'Connection ok\nIP ' + str(wifi.radio.ipv4_address), x = 5, y = 10, color = 0xFFFFFF))
else:
reconnect_elements.append(label.Label(terminalio.FONT, text = 'Connection fail\nRetry in 30s..', x = 5, y = 10, color = 0xFFFFFF))
splash.append(reconnect_elements)
last_wifi_try = now
time.sleep(1)
try: splash.remove(reconnect_elements)
except: pass
fc = check_error()
## get the temp, but do compressor speed based on 1min average
temp = int(p_ntc.temperature())
if (now - temp_avg_last) > 5:
temp_avg_list[now] = temp
temp_avg_last = now
for t in temp_avg_list.keys():
if (now - t) > 60:
del temp_avg_list[t]
avg_temp_list = []
for ts in temp_avg_list.keys():
avg_temp_list.append(temp_avg_list[ts])
avg_temp = int(sum(avg_temp_list) / len(avg_temp_list))
if (temp > avg_temp) and (compressor_speed != 0) and ((now - compressor_speed_last_set) > 30):
print('Temperature is increasing while compressor is running. Suspect high thermal load. Boosting compressor speed for 5mins.')
boost_compressor = now + 300
## if the temperature is changing too fast, the sensor may be faulty
if (temp - avg_temp) > TEMPERATURE_RATE or (avg_temp - temp) > TEMPERATURE_RATE:
print('Is the fridge closed? Temperature average is ' + str(avg_temp) + 'c but last reading was ' + str(temp) + 'c. Max allowed deviation is ' + str(TEMPERATURE_RATE) + 'c.')
fc = 10
## if the temperature is outside sane values, report a fault too
if (temp < TEMPERATURE_SANE_MIN) or (temp > TEMPERATURE_SANE_MAX):
print('Suspect temperature probe fault. Last reading was ' + str(temp) + 'c, but sane values are ' + str(TEMPERATURE_SANE_MIN) + 'c to ' + str(TEMPERATURE_SANE_MAX) + 'c.')
fc = 11
## if the compressor controller (arduino) hasn't responded in the last
## 10sec, report an error
if (now - last_compressor_comms) > 10:
print('Compressor/fault controller is unresponsive. Check connections and controller.')
fc = 12
print('** Temperature: ' + str(temp) + 'c, average: ' + str(avg_temp) + 'c. Status code ' + str(fc) + '. Compressor speed ' + str(compressor_speed))
if fc != 0:
ew = show_error(fc)
splash.append(ew)
time.sleep(2)
try: splash.remove(ew)
except: pass
if (avg_temp - set_temperature) < 2:
if now < boost_compressor: is_holding = set_compressor_speed(0)
else: is_holding = set_compressor_speed(1)
elif (avg_temp - set_temperature) < 4:
if now < boost_compressor: is_holding = set_compressor_speed(1)
else: is_holding = set_compressor_speed(2)
else:
is_holding = set_compressor_speed(2)
## if connected, send udp data to display head every 5sec
if wifi_connected and (now - last_udp_send) >= 5:
last_udp_send = now
message = '#10|5^20^image^snow|'
## current temperature
#message += '65^20^text:A3FCFF:None^Currently ' + str(temp) + 'c|'
## current temp using 7seg style digits
d_offset_x = 40
for d in str(temp):
if d == '-':
dig = 'minus'
d_offset_y = 20
else:
dig = 'd' + str(d)
d_offset_y = 5
message += str(d_offset_x) + '^' + str(d_offset_y) + '^image^' + dig + '|'
d_offset_x += 25
message += str(d_offset_x) + '^30^image^dc|'
## set temperature
message += '40^75^text:FFFFFF:None^Setpoint ' + str(set_temperature) + 'c|'
## compressor
if compressor_speed == 0:
message += '5^90^text:FFCD78:None^Compressor: Off'
elif compressor_speed == 1:
message += '5^90^text:FFCD78:None^Compressor: Low'
elif compressor_speed == 2:
message += '5^90^text:FFCD78:None^Compressor: High'
compressor_special = []
if is_holding == True: compressor_special.append('H')
if now < boost_compressor: compressor_special.append('B')
if len(compressor_special) > 0:
message += ' (' + ','.join(compressor_special) + ')'
message += '|'
if fc == 0:
message += '5^105^text:00FF00:None^No faults reported#'
elif fc == 1:
message += '5^105^text:FF0000:None^Fault:\nBattery low#'
elif fc == 2:
message += '5^105text:FF0000:None^Fault:\nFan over-current#'
elif fc == 3:
message += '5^105^text:FF0000:None^Fault:\nRefrigerant blockage#'
elif fc == 4:
message += '5^105^text:FF0000:None^Fault:\nCompressor overload#'
elif fc == 5:
message += '5^105^text:FF0000:None^Fault:\nCompressor overheated#'
elif fc == 10:
message += '5^105^text:FF0000:None^Fault:\nFast temperature change#'
elif fc == 11:
message += '5^105^text:FF0000:None^Fault:\nImpossible temperature#'
elif fc == 12:
message += '5^105^text:FF0000:None^Fault:\nController unresponsive#'
else:
message += '5^105^text:FF0000:None^Fault:\nFault code ' + str(fc) + '#'
print('Sending UDP status packet [' + message + ']')
send_message(message)
new_screen = screen_current_temp(temp, set_temperature, is_holding)
new_screen.append(animate_snowflake(animate_frame, 100, 5))
if wifi_connected == False: new_screen.append(label.Label(terminalio.FONT, text = 'Offline', x = 80, y = 10, color = 0xFFFFFF))
elif secret_source == 'Backup': new_screen.append(label.Label(terminalio.FONT, text = 'Backup NW', x = 70, y = 10, color = 0xFFFFFF))
if now < boost_compressor: new_screen.append(label.Label(terminalio.FONT, text = 'Boost', x = 80, y = 20, color = 0xFFFFFF))
try: splash.append(new_screen)
except Exception as e:
print('**Screen draw failed:')
traceback.print_exception()
try: splash.remove(old_screen)
except: pass
del old_screen
old_screen = new_screen
animate_frame += 1
if animate_frame > 17: animate_frame = 0
gc.collect()
except Exception as err:
print('!!!************************!!!')
print('!!! An exception occurred !!!')
print('!!!************************!!!')
traceback.print_exception(err)
try:
storage.remount("/", readonly=False)
with open('last_error', 'w') as fp:
traceback.print_exception(err, file = fp)
storage.remount("/", readonly=True)
print('Wrote exception information to last_error')
except:
print('Unable to write exception information to last_error')
print('Restarting to recover from exception..')
for i in range(5):
print('\t' + str(5-i) + '...')
time.sleep(1)
supervisor.reload()
@toxicantidote
Copy link
Author

still crashes out and gets in a boot loop (until wifi connect) after a few hours as of rev15

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment