Skip to content

Instantly share code, notes, and snippets.

@dcollien
Last active September 12, 2018 08:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dcollien/c96c645c716344ac3b9f39e3603b8907 to your computer and use it in GitHub Desktop.
Save dcollien/c96c645c716344ac3b9f39e3603b8907 to your computer and use it in GitHub Desktop.
ESP8266 Traffic Lights
import socket
import network
import time
CONTENT = b"""\
HTTP/1.0 200 OK
<!doctype html>
<html>
<head>
<title>Configuration</title>
<meta name="viewport" content="width=device-width, initial-scale=1">
<meta charset="utf8">
</head>
<body>
<h1>Configuration</h1>
<form action="/api">
Server Endpoint: <input type="text" placeholder="http://" name="endpoint" value="{}"/><br/>
Wifi SSID: <input type="text" name="ssid" value="{}"/><br/>
Wifi Password: <input type="text" name="password" value="{}"/><br/>
<input type="submit" value="Save"/>
</form>
<script>
</script>
</body>
</html>
"""
def url_decode(str):
dic = {"%21":"!","%22":'"',"%23":"#","%24":"$","%26":"&","%27":"'","%28":"(","%29":")","%2A":"*","%2B":"+","%2C":",","%2F":"/","%3A":":","%3B":";","%3D":"=","%3F":"?","%40":"@","%5B":"[","%5D":"]","%7B":"{","%7D":"}"}
for k,v in dic.items(): str=str.replace(k,v)
return str
def get_config(file_name):
print('Opening config:', file_name)
try:
f = open(file_name)
except:
f = open(file_name, w)
f.write(',,')
f.close()
f = open(file_name)
data = f.read()
f.close
print('Config contains', data)
parts = data.split(',')
if len(parts) == 3:
return parts
else:
return '', '', ''
def set_config(file_name, endpoint, ssid, password):
f = open(file_name, 'w')
data = ','.join([endpoint, ssid, password])
f.write(data)
f.close()
class DNSQuery:
def __init__(self, data):
self.data=data
self.domain=''
print("Reading datagram data...")
m = data[2] # ord(data[2])
opcode = (m >> 3) & 15 # Opcode bits
if opcode == 0: # Standard query
ini=12
lon=data[ini] # ord(data[ini])
while lon != 0:
self.domain+=data[ini+1:ini+lon+1].decode("utf-8") +'.'
ini+=lon+1
lon=data[ini] #ord(data[ini])
def response(self, ip):
packet=b''
print("Resposta {} == {}".format(self.domain, ip))
if self.domain:
packet+=self.data[:2] + b"\x81\x80"
packet+=self.data[4:6] + self.data[4:6] + b'\x00\x00\x00\x00' # Questions and Answers Counts
packet+=self.data[12:] # Original Domain Name Question
packet+= b'\xc0\x0c' # Pointer to domain name
packet+= b'\x00\x01\x00\x01\x00\x00\x00\x3c\x00\x04' # Response type, ttl and resource data length -> 4 bytes
packet+=bytes(map(int,ip.split('.'))) # 4 bytes of IP
return packet
def start_captive(ssid, password, config_file, loop_condition):
ap = network.WLAN(network.AP_IF)
ap.active(True)
try:
ap.config(essid=ssid, password=password, authmode=4) #authmode=1 == no pass
except OSError:
pass
# DNS Server
ip=ap.ifconfig()[0]
print('DNS Server: dom.query. 60 IN A {:s}'.format(ip))
udps = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udps.setblocking(False)
udps.bind(('',53))
# Web Server
s = socket.socket()
ai = socket.getaddrinfo(ip, 80)
print("Web Server: Bind address info:", ai)
addr = ai[0][-1]
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(addr)
s.listen(1)
s.settimeout(2)
print("Web Server: Listening http://{}:80/".format(ip))
try:
while loop_condition():
# DNS Loop
print("Before DNS...")
try:
data, addr = udps.recvfrom(1024)
print("incomming datagram...")
p=DNSQuery(data)
udps.sendto(p.response(ip), addr)
print('Replying: {:s} -> {:s}'.format(p.domain, ip))
except:
print("No dgram")
# Web loop
print("before accept...")
accepted = False
try:
res = s.accept()
accepted = True
client_sock = res[0]
client_addr = res[1]
#print("Client address:", client_addr)
#print("Client socket:", client_sock)
client_stream = client_sock
print("Request:")
req = client_stream.readline()
print(req)
while True:
h = client_stream.readline()
if h == b"" or h == b"\r\n" or h == None:
break
print(h)
except:
print("timeout for web... moving on...")
if accepted:
# Change config based on request variables
method, request_url, _ = req.split(b' ')
url_parts = request_url.split(b'?')
endpoint, ssid, password = get_config(config_file)
if len(url_parts) == 2:
params = url_parts[1].decode('ascii')
else:
params = None
if params is not None:
try:
d = {key: value for (key, value) in [x.split('=') for x in params.split('&')]}
except:
d = {}
if 'endpoint' in d.keys():
endpoint = url_decode(d['endpoint'])
if 'ssid' in d.keys():
ssid = d['ssid']
if 'password' in d.keys():
password = d['password']
set_config(
config_file,
endpoint,
ssid,
password
)
client_stream.write(CONTENT.format(endpoint, ssid, password))
client_stream.close()
print("loop")
time.sleep_ms(300)
except KeyboardInterrupt:
print('Closing')
udps.close()
ap.active(False)
D0 = 16
D1 = 5
D2 = 4
D3 = 0
D4 = 2
D5 = 14
D6 = 12
D7 = 13
D8 = 15
import time
from machine import Pin
from neopixel import NeoPixel
from web import query_server, connect_to_wifi
from config import start_captive, get_config
NUM_LEDS = 3
RED = (255, 50, 50)
GREEN = (0, 255, 0)
AMBER = (255, 120, 0)
OFF = (0, 0, 0)
MODE_ERROR = 0
MODE_TEST = 1
MODE_SERVER = 2
MODE_CONFIG = 3
MODE_CONNECTING = 4
CONFIG_FILE = 'config.txt'
# Hardware State
switch = Pin(D8, Pin.IN)
colors = NeoPixel(Pin(D4), NUM_LEDS)
# Software State
light_state = 0b111
error_mode = None
mode = MODE_ERROR
def is_config():
global mode
return mode == MODE_CONFIG
def update_lights():
global light_state, colors
colors[0] = RED if (light_state & 0b100) else OFF
colors[1] = AMBER if (light_state & 0b010) else OFF
colors[2] = GREEN if (light_state & 0b001) else OFF
colors.write()
def switch_set(irq):
global switch, mode, error_mode
if switch.value():
mode = MODE_CONNECTING
else:
mode = MODE_CONFIG
error_mode = mode
def error_loop():
global light_state, mode, error_mode
error_flash = 0
for i in range(6):
light_state = error_flash << 1
error_flash = 1 - error_flash
update_lights()
time.sleep_ms(1000)
# retry
mode = error_mode
error_mode = None
def test_loop():
global light_state, mode
light_state = 0b100
while mode == MODE_TEST:
light_state = light_state << 1
if light_state == 0b1000:
light_state = 0b001
update_lights()
time.sleep_ms(500)
def start_config():
global light_state
print('Starting Config')
light_state = 0b111
update_lights()
start_captive('Traffic Lights', 'iliketrafficlights', CONFIG_FILE, is_config)
def request_loop():
global light_state, mode, error_mode
light_state = 0b100
while mode == MODE_SERVER:
light_state = query_server(endpoint)
if light_state is not None:
update_lights()
else:
error_mode = mode
mode = MODE_ERROR
time.sleep_ms(1000)
def connect_status_callback(status):
global light_state
if status == 0:
light_state = 0b100
elif status == 1:
light_state = 0b110
elif status == 2:
light_state = 0b001
update_lights()
switch.irq(trigger=Pin.IRQ_RISING | Pin.IRQ_FALLING, handler=switch_set)
switch_set(None)
while True:
print('Loop, Mode', mode)
if mode == MODE_ERROR:
error_loop()
elif mode == MODE_TEST:
test_loop()
elif mode == MODE_CONFIG:
start_config()
mode = MODE_CONNECTING
elif mode == MODE_CONNECTING:
endpoint, ssid, password = get_config(CONFIG_FILE)
connect_to_wifi(ssid, password, connect_status_callback)
mode = MODE_SERVER
elif mode == MODE_SERVER:
request_loop()
else:
print("Unknown Mode", mode)
time.sleep_ms(1000)
import socket
import network
import time
import ussl
def http_get(url):
proto, _, host, path = url.split('/', 3)
if proto == 'http:':
port = 80
else:
port = 443
addr = socket.getaddrinfo(host, port)[0][-1]
s = socket.socket()
s.connect(addr)
if proto == 'https:':
s = ussl.wrap_socket(s, server_hostname=host)
s.send(bytes('GET /%s HTTP/1.0\r\nHost: %s\r\n\r\n' % (path, host), 'utf8'))
response = ''
while True:
data = s.recv(100)
if data:
response += str(data, 'utf8')
print(str(data, 'utf8'), end='')
else:
break
s.close()
return response.split('\r\n\r\n')
def query_server(endpoint):
try:
headers, body = http_get(endpoint)
print('Server responded:', body)
val = int(body)
except Exception:
val = None
return val
def connect_to_wifi(essid, password, status_cb):
sta_if = network.WLAN(network.STA_IF)
status_cb(0)
time.sleep_ms(1000)
if not sta_if.isconnected():
print('Connecting to network...')
sta_if.active(True)
sta_if.connect(essid, password)
while not sta_if.isconnected():
pass
status_cb(1)
ip, subnet_mask, gateway, dns_server = sta_if.ifconfig()
print(ip, subnet_mask, gateway, dns_server)
# Set the DNS Server (optional)
sta_if.ifconfig((ip, subnet_mask, gateway, '8.8.8.8'))
print('Connected to', essid)
time.sleep_ms(1000)
status_cb(2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment