Skip to content

Instantly share code, notes, and snippets.

@russhughes
Created November 13, 2022 17:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save russhughes/3deb6265cad776ea2a23e1ffc90cda76 to your computer and use it in GitHub Desktop.
Save russhughes/3deb6265cad776ea2a23e1ffc90cda76 to your computer and use it in GitHub Desktop.
Micropython temp server for esp8266 yellow board and ds18x20
# Based on web example from https://RandomNerdTutorials.com
try:
import usocket as socket
except:
import socket
from machine import Pin, PWM, ADC, RTC
from time import sleep, time
import onewire, ds18x20
import network
import ure
import ntptime
import ujson
import config
import esp
import gc
def ntp_update():
try:
ntptime.settime() # set the RTC's time using ntptime
print(rtc.datetime()) # print the current time from the RTC
return True
except Exception:
print("NTP query failed.")
return False
esp.osdebug(None)
gc.collect()
ntp_update_interval = 3600
ap = network.WLAN(network.AP_IF)
ap.active(False)
station = network.WLAN(network.STA_IF)
station.active(True)
station.connect('DRS', 'gocougars09')
while station.isconnected() == False:
pass
station.ifconfig((config.IP, '255.255.255.0', '10.10.0.1', '10.10.0.1'))
print('Connection successful', station.ifconfig())
# initialize the RTC and get the current time from NTP
rtc = RTC()
ntp_last_sync = time() if ntp_update() else 0
ds_sensor = ds18x20.DS18X20(onewire.OneWire(Pin(4)))
adc = ADC(0)
red = PWM(Pin(13), 0, 0)
green = PWM(Pin(12), 0, 0)
blue = PWM(Pin(15), 0, 0)
regex = ure.compile("GET /r=(\d+),(\d+),(\d+).*")
def read_ds_sensor():
roms = ds_sensor.scan()
ds_sensor.convert_temp()
for rom in roms:
temp = ds_sensor.read_temp(rom)
if isinstance(temp, float):
msg = round(temp, 2)
print(temp, end=' ')
print('Valid temperature')
return msg
return b'0.0'
def lights():
return "Off" if adc.read() > 30 else "On"
def web_page():
temp = read_ds_sensor()
return """<!DOCTYPE HTML>
<html><head><meta name="viewport" content="width=device-width, initial-scale=1">
<link rel="stylesheet" href="https://use.fontawesome.com/releases/v5.7.2/css/all.css" integrity="sha384-fnmOCqbTlWIlj8LyTjo7mOUStjsKC4pOpQbqyi7RrhN7udi9RwhKkMHpvLbHG9Sr" crossorigin="anonymous">
<style> html { font-family: Arial; display: inline-block; margin: 0px auto; text-align: center; }
h2 { font-size: 1.5rem; } p { font-size: 1.5rem; } .units { font-size: 1.2rem; }
.ds-labels{ font-size: 1.5rem; vertical-align:middle; padding-bottom: 15px; }
table { margin-left: auto; margin-right: auto; }
td { font-size: 1.5rem; vertical-align:middle; padding: 15px; }
.data { text-align: right; }
}
</style></head><body><h2>Location:""" + config.LOCATION + """</h2>
<table>
<tr>
<td><i class="fas fa-thermometer-half" style="color:#059e8a;"></i></td>
<td>Temperature</td>
<td class="data">""" + str(temp) + """</td>
<td><sup class="units">&deg;C</sup></td>
</tr>
<tr>
<td><i class="fas fa-thermometer-half" style="color:#059e8a;"></i></td>
<td>Temperature</td>
<td class="data">""" + str(round(temp * (9/5) + 32.0, 2)) + """</td>
<td><sup class="units">&deg;F</sup></td>
</tr>
<tr>
<td><i class="fas fa-lightbulb" style="color:#059e8a;"></i></td>
<td>Lights</td>
<td class="data">""" + str(adc.read()) + """</td>
<td class="data">""" + lights() + """</td>
</tr>
</table>
</body></html>"""
def api_c():
temp = read_ds_sensor()
return str(temp)
def api_f():
temp = read_ds_sensor()
return str(round(temp * (9/5) + 32.0, 2))
def api_j():
temp = read_ds_sensor()
dt = rtc.datetime()
now = "{}-{:02d}-{:02d} {:02d}:{:02d}:{:02d}".format(dt[0], dt[1],dt[2], dt[4], dt[5], dt[6])
return ujson.dumps({"date": now, "c": temp, "f": round(temp * (9/5) + 32.0, 2), "l": adc.read()})
def api_l():
return str(adc.read())
read_ds_sensor()
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(('', 80))
s.listen(5)
while True:
try:
if gc.mem_free() < 102000:
gc.collect()
conn, addr = s.accept()
conn.settimeout(3.0)
request = str(conn.recv(1024))
conn.settimeout(None)
print("Request: ", request)
parms = ure.search("GET (.*?) HTTP\/1\.1", request)
if parms:
print ("parms: ", parms)
if 'GET /?c' in request:
response = api_c()
elif 'GET /?f' in request:
response = api_f()
elif 'GET /?j' in request:
response = api_j()
elif 'GET /?l' in request:
response = api_l()
elif 'GET /r=' in request:
match = regex.match(request)
print("match: ", match)
if match:
red.PWM(int(match[1]))
green.PWM(int(match[2]))
blue.PWM(int(match[3]))
response = "ok"
else:
response = "error"
else:
response = web_page()
conn.send('HTTP/1.1 200 OK\n')
conn.send('Content-Type: text/html\n')
conn.send('Connection: close\n\n')
conn.sendall(response)
conn.close()
except OSError as e:
conn.close()
if (time() - ntp_last_sync) > 3600 or ntp_last_sync == 0:
print("Syncing time")
if ntp_update():
ntp_last_sync = time()
print("Time synced")
else:
print("Error syncing time")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment