Created
November 13, 2022 17:12
-
-
Save russhughes/3deb6265cad776ea2a23e1ffc90cda76 to your computer and use it in GitHub Desktop.
Micropython temp server for esp8266 yellow board and ds18x20
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Based on web example from https://RandomNerdTutorials.com | |
try: | |
import usocket as socket | |
except: | |
import socket | |
from machine import Pin, PWM, ADC, RTC | |
from time import sleep, time | |
import onewire, ds18x20 | |
import network | |
import ure | |
import ntptime | |
import ujson | |
import config | |
import esp | |
import gc | |
def ntp_update(): | |
try: | |
ntptime.settime() # set the RTC's time using ntptime | |
print(rtc.datetime()) # print the current time from the RTC | |
return True | |
except Exception: | |
print("NTP query failed.") | |
return False | |
esp.osdebug(None) | |
gc.collect() | |
ntp_update_interval = 3600 | |
ap = network.WLAN(network.AP_IF) | |
ap.active(False) | |
station = network.WLAN(network.STA_IF) | |
station.active(True) | |
station.connect('DRS', 'gocougars09') | |
while station.isconnected() == False: | |
pass | |
station.ifconfig((config.IP, '255.255.255.0', '10.10.0.1', '10.10.0.1')) | |
print('Connection successful', station.ifconfig()) | |
# initialize the RTC and get the current time from NTP | |
rtc = RTC() | |
ntp_last_sync = time() if ntp_update() else 0 | |
ds_sensor = ds18x20.DS18X20(onewire.OneWire(Pin(4))) | |
adc = ADC(0) | |
red = PWM(Pin(13), 0, 0) | |
green = PWM(Pin(12), 0, 0) | |
blue = PWM(Pin(15), 0, 0) | |
regex = ure.compile("GET /r=(\d+),(\d+),(\d+).*") | |
def read_ds_sensor(): | |
roms = ds_sensor.scan() | |
ds_sensor.convert_temp() | |
for rom in roms: | |
temp = ds_sensor.read_temp(rom) | |
if isinstance(temp, float): | |
msg = round(temp, 2) | |
print(temp, end=' ') | |
print('Valid temperature') | |
return msg | |
return b'0.0' | |
def lights(): | |
return "Off" if adc.read() > 30 else "On" | |
def web_page(): | |
temp = read_ds_sensor() | |
return """<!DOCTYPE HTML> | |
<html><head><meta name="viewport" content="width=device-width, initial-scale=1"> | |
<link rel="stylesheet" href="https://use.fontawesome.com/releases/v5.7.2/css/all.css" integrity="sha384-fnmOCqbTlWIlj8LyTjo7mOUStjsKC4pOpQbqyi7RrhN7udi9RwhKkMHpvLbHG9Sr" crossorigin="anonymous"> | |
<style> html { font-family: Arial; display: inline-block; margin: 0px auto; text-align: center; } | |
h2 { font-size: 1.5rem; } p { font-size: 1.5rem; } .units { font-size: 1.2rem; } | |
.ds-labels{ font-size: 1.5rem; vertical-align:middle; padding-bottom: 15px; } | |
table { margin-left: auto; margin-right: auto; } | |
td { font-size: 1.5rem; vertical-align:middle; padding: 15px; } | |
.data { text-align: right; } | |
} | |
</style></head><body><h2>Location:""" + config.LOCATION + """</h2> | |
<table> | |
<tr> | |
<td><i class="fas fa-thermometer-half" style="color:#059e8a;"></i></td> | |
<td>Temperature</td> | |
<td class="data">""" + str(temp) + """</td> | |
<td><sup class="units">°C</sup></td> | |
</tr> | |
<tr> | |
<td><i class="fas fa-thermometer-half" style="color:#059e8a;"></i></td> | |
<td>Temperature</td> | |
<td class="data">""" + str(round(temp * (9/5) + 32.0, 2)) + """</td> | |
<td><sup class="units">°F</sup></td> | |
</tr> | |
<tr> | |
<td><i class="fas fa-lightbulb" style="color:#059e8a;"></i></td> | |
<td>Lights</td> | |
<td class="data">""" + str(adc.read()) + """</td> | |
<td class="data">""" + lights() + """</td> | |
</tr> | |
</table> | |
</body></html>""" | |
def api_c(): | |
temp = read_ds_sensor() | |
return str(temp) | |
def api_f(): | |
temp = read_ds_sensor() | |
return str(round(temp * (9/5) + 32.0, 2)) | |
def api_j(): | |
temp = read_ds_sensor() | |
dt = rtc.datetime() | |
now = "{}-{:02d}-{:02d} {:02d}:{:02d}:{:02d}".format(dt[0], dt[1],dt[2], dt[4], dt[5], dt[6]) | |
return ujson.dumps({"date": now, "c": temp, "f": round(temp * (9/5) + 32.0, 2), "l": adc.read()}) | |
def api_l(): | |
return str(adc.read()) | |
read_ds_sensor() | |
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
s.bind(('', 80)) | |
s.listen(5) | |
while True: | |
try: | |
if gc.mem_free() < 102000: | |
gc.collect() | |
conn, addr = s.accept() | |
conn.settimeout(3.0) | |
request = str(conn.recv(1024)) | |
conn.settimeout(None) | |
print("Request: ", request) | |
parms = ure.search("GET (.*?) HTTP\/1\.1", request) | |
if parms: | |
print ("parms: ", parms) | |
if 'GET /?c' in request: | |
response = api_c() | |
elif 'GET /?f' in request: | |
response = api_f() | |
elif 'GET /?j' in request: | |
response = api_j() | |
elif 'GET /?l' in request: | |
response = api_l() | |
elif 'GET /r=' in request: | |
match = regex.match(request) | |
print("match: ", match) | |
if match: | |
red.PWM(int(match[1])) | |
green.PWM(int(match[2])) | |
blue.PWM(int(match[3])) | |
response = "ok" | |
else: | |
response = "error" | |
else: | |
response = web_page() | |
conn.send('HTTP/1.1 200 OK\n') | |
conn.send('Content-Type: text/html\n') | |
conn.send('Connection: close\n\n') | |
conn.sendall(response) | |
conn.close() | |
except OSError as e: | |
conn.close() | |
if (time() - ntp_last_sync) > 3600 or ntp_last_sync == 0: | |
print("Syncing time") | |
if ntp_update(): | |
ntp_last_sync = time() | |
print("Time synced") | |
else: | |
print("Error syncing time") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment