Last active
September 12, 2018 08:18
-
-
Save dcollien/c96c645c716344ac3b9f39e3603b8907 to your computer and use it in GitHub Desktop.
ESP8266 Traffic Lights
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket | |
import network | |
import time | |
CONTENT = b"""\ | |
HTTP/1.0 200 OK | |
<!doctype html> | |
<html> | |
<head> | |
<title>Configuration</title> | |
<meta name="viewport" content="width=device-width, initial-scale=1"> | |
<meta charset="utf8"> | |
</head> | |
<body> | |
<h1>Configuration</h1> | |
<form action="/api"> | |
Server Endpoint: <input type="text" placeholder="http://" name="endpoint" value="{}"/><br/> | |
Wifi SSID: <input type="text" name="ssid" value="{}"/><br/> | |
Wifi Password: <input type="text" name="password" value="{}"/><br/> | |
<input type="submit" value="Save"/> | |
</form> | |
<script> | |
</script> | |
</body> | |
</html> | |
""" | |
def url_decode(str): | |
dic = {"%21":"!","%22":'"',"%23":"#","%24":"$","%26":"&","%27":"'","%28":"(","%29":")","%2A":"*","%2B":"+","%2C":",","%2F":"/","%3A":":","%3B":";","%3D":"=","%3F":"?","%40":"@","%5B":"[","%5D":"]","%7B":"{","%7D":"}"} | |
for k,v in dic.items(): str=str.replace(k,v) | |
return str | |
def get_config(file_name): | |
print('Opening config:', file_name) | |
try: | |
f = open(file_name) | |
except: | |
f = open(file_name, w) | |
f.write(',,') | |
f.close() | |
f = open(file_name) | |
data = f.read() | |
f.close | |
print('Config contains', data) | |
parts = data.split(',') | |
if len(parts) == 3: | |
return parts | |
else: | |
return '', '', '' | |
def set_config(file_name, endpoint, ssid, password): | |
f = open(file_name, 'w') | |
data = ','.join([endpoint, ssid, password]) | |
f.write(data) | |
f.close() | |
class DNSQuery: | |
def __init__(self, data): | |
self.data=data | |
self.domain='' | |
print("Reading datagram data...") | |
m = data[2] # ord(data[2]) | |
opcode = (m >> 3) & 15 # Opcode bits | |
if opcode == 0: # Standard query | |
ini=12 | |
lon=data[ini] # ord(data[ini]) | |
while lon != 0: | |
self.domain+=data[ini+1:ini+lon+1].decode("utf-8") +'.' | |
ini+=lon+1 | |
lon=data[ini] #ord(data[ini]) | |
def response(self, ip): | |
packet=b'' | |
print("Resposta {} == {}".format(self.domain, ip)) | |
if self.domain: | |
packet+=self.data[:2] + b"\x81\x80" | |
packet+=self.data[4:6] + self.data[4:6] + b'\x00\x00\x00\x00' # Questions and Answers Counts | |
packet+=self.data[12:] # Original Domain Name Question | |
packet+= b'\xc0\x0c' # Pointer to domain name | |
packet+= b'\x00\x01\x00\x01\x00\x00\x00\x3c\x00\x04' # Response type, ttl and resource data length -> 4 bytes | |
packet+=bytes(map(int,ip.split('.'))) # 4 bytes of IP | |
return packet | |
def start_captive(ssid, password, config_file, loop_condition): | |
ap = network.WLAN(network.AP_IF) | |
ap.active(True) | |
try: | |
ap.config(essid=ssid, password=password, authmode=4) #authmode=1 == no pass | |
except OSError: | |
pass | |
# DNS Server | |
ip=ap.ifconfig()[0] | |
print('DNS Server: dom.query. 60 IN A {:s}'.format(ip)) | |
udps = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
udps.setblocking(False) | |
udps.bind(('',53)) | |
# Web Server | |
s = socket.socket() | |
ai = socket.getaddrinfo(ip, 80) | |
print("Web Server: Bind address info:", ai) | |
addr = ai[0][-1] | |
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
s.bind(addr) | |
s.listen(1) | |
s.settimeout(2) | |
print("Web Server: Listening http://{}:80/".format(ip)) | |
try: | |
while loop_condition(): | |
# DNS Loop | |
print("Before DNS...") | |
try: | |
data, addr = udps.recvfrom(1024) | |
print("incomming datagram...") | |
p=DNSQuery(data) | |
udps.sendto(p.response(ip), addr) | |
print('Replying: {:s} -> {:s}'.format(p.domain, ip)) | |
except: | |
print("No dgram") | |
# Web loop | |
print("before accept...") | |
accepted = False | |
try: | |
res = s.accept() | |
accepted = True | |
client_sock = res[0] | |
client_addr = res[1] | |
#print("Client address:", client_addr) | |
#print("Client socket:", client_sock) | |
client_stream = client_sock | |
print("Request:") | |
req = client_stream.readline() | |
print(req) | |
while True: | |
h = client_stream.readline() | |
if h == b"" or h == b"\r\n" or h == None: | |
break | |
print(h) | |
except: | |
print("timeout for web... moving on...") | |
if accepted: | |
# Change config based on request variables | |
method, request_url, _ = req.split(b' ') | |
url_parts = request_url.split(b'?') | |
endpoint, ssid, password = get_config(config_file) | |
if len(url_parts) == 2: | |
params = url_parts[1].decode('ascii') | |
else: | |
params = None | |
if params is not None: | |
try: | |
d = {key: value for (key, value) in [x.split('=') for x in params.split('&')]} | |
except: | |
d = {} | |
if 'endpoint' in d.keys(): | |
endpoint = url_decode(d['endpoint']) | |
if 'ssid' in d.keys(): | |
ssid = d['ssid'] | |
if 'password' in d.keys(): | |
password = d['password'] | |
set_config( | |
config_file, | |
endpoint, | |
ssid, | |
password | |
) | |
client_stream.write(CONTENT.format(endpoint, ssid, password)) | |
client_stream.close() | |
print("loop") | |
time.sleep_ms(300) | |
except KeyboardInterrupt: | |
print('Closing') | |
udps.close() | |
ap.active(False) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
D0 = 16 | |
D1 = 5 | |
D2 = 4 | |
D3 = 0 | |
D4 = 2 | |
D5 = 14 | |
D6 = 12 | |
D7 = 13 | |
D8 = 15 | |
import time | |
from machine import Pin | |
from neopixel import NeoPixel | |
from web import query_server, connect_to_wifi | |
from config import start_captive, get_config | |
NUM_LEDS = 3 | |
RED = (255, 50, 50) | |
GREEN = (0, 255, 0) | |
AMBER = (255, 120, 0) | |
OFF = (0, 0, 0) | |
MODE_ERROR = 0 | |
MODE_TEST = 1 | |
MODE_SERVER = 2 | |
MODE_CONFIG = 3 | |
MODE_CONNECTING = 4 | |
CONFIG_FILE = 'config.txt' | |
# Hardware State | |
switch = Pin(D8, Pin.IN) | |
colors = NeoPixel(Pin(D4), NUM_LEDS) | |
# Software State | |
light_state = 0b111 | |
error_mode = None | |
mode = MODE_ERROR | |
def is_config(): | |
global mode | |
return mode == MODE_CONFIG | |
def update_lights(): | |
global light_state, colors | |
colors[0] = RED if (light_state & 0b100) else OFF | |
colors[1] = AMBER if (light_state & 0b010) else OFF | |
colors[2] = GREEN if (light_state & 0b001) else OFF | |
colors.write() | |
def switch_set(irq): | |
global switch, mode, error_mode | |
if switch.value(): | |
mode = MODE_CONNECTING | |
else: | |
mode = MODE_CONFIG | |
error_mode = mode | |
def error_loop(): | |
global light_state, mode, error_mode | |
error_flash = 0 | |
for i in range(6): | |
light_state = error_flash << 1 | |
error_flash = 1 - error_flash | |
update_lights() | |
time.sleep_ms(1000) | |
# retry | |
mode = error_mode | |
error_mode = None | |
def test_loop(): | |
global light_state, mode | |
light_state = 0b100 | |
while mode == MODE_TEST: | |
light_state = light_state << 1 | |
if light_state == 0b1000: | |
light_state = 0b001 | |
update_lights() | |
time.sleep_ms(500) | |
def start_config(): | |
global light_state | |
print('Starting Config') | |
light_state = 0b111 | |
update_lights() | |
start_captive('Traffic Lights', 'iliketrafficlights', CONFIG_FILE, is_config) | |
def request_loop(): | |
global light_state, mode, error_mode | |
light_state = 0b100 | |
while mode == MODE_SERVER: | |
light_state = query_server(endpoint) | |
if light_state is not None: | |
update_lights() | |
else: | |
error_mode = mode | |
mode = MODE_ERROR | |
time.sleep_ms(1000) | |
def connect_status_callback(status): | |
global light_state | |
if status == 0: | |
light_state = 0b100 | |
elif status == 1: | |
light_state = 0b110 | |
elif status == 2: | |
light_state = 0b001 | |
update_lights() | |
switch.irq(trigger=Pin.IRQ_RISING | Pin.IRQ_FALLING, handler=switch_set) | |
switch_set(None) | |
while True: | |
print('Loop, Mode', mode) | |
if mode == MODE_ERROR: | |
error_loop() | |
elif mode == MODE_TEST: | |
test_loop() | |
elif mode == MODE_CONFIG: | |
start_config() | |
mode = MODE_CONNECTING | |
elif mode == MODE_CONNECTING: | |
endpoint, ssid, password = get_config(CONFIG_FILE) | |
connect_to_wifi(ssid, password, connect_status_callback) | |
mode = MODE_SERVER | |
elif mode == MODE_SERVER: | |
request_loop() | |
else: | |
print("Unknown Mode", mode) | |
time.sleep_ms(1000) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket | |
import network | |
import time | |
import ussl | |
def http_get(url): | |
proto, _, host, path = url.split('/', 3) | |
if proto == 'http:': | |
port = 80 | |
else: | |
port = 443 | |
addr = socket.getaddrinfo(host, port)[0][-1] | |
s = socket.socket() | |
s.connect(addr) | |
if proto == 'https:': | |
s = ussl.wrap_socket(s, server_hostname=host) | |
s.send(bytes('GET /%s HTTP/1.0\r\nHost: %s\r\n\r\n' % (path, host), 'utf8')) | |
response = '' | |
while True: | |
data = s.recv(100) | |
if data: | |
response += str(data, 'utf8') | |
print(str(data, 'utf8'), end='') | |
else: | |
break | |
s.close() | |
return response.split('\r\n\r\n') | |
def query_server(endpoint): | |
try: | |
headers, body = http_get(endpoint) | |
print('Server responded:', body) | |
val = int(body) | |
except Exception: | |
val = None | |
return val | |
def connect_to_wifi(essid, password, status_cb): | |
sta_if = network.WLAN(network.STA_IF) | |
status_cb(0) | |
time.sleep_ms(1000) | |
if not sta_if.isconnected(): | |
print('Connecting to network...') | |
sta_if.active(True) | |
sta_if.connect(essid, password) | |
while not sta_if.isconnected(): | |
pass | |
status_cb(1) | |
ip, subnet_mask, gateway, dns_server = sta_if.ifconfig() | |
print(ip, subnet_mask, gateway, dns_server) | |
# Set the DNS Server (optional) | |
sta_if.ifconfig((ip, subnet_mask, gateway, '8.8.8.8')) | |
print('Connected to', essid) | |
time.sleep_ms(1000) | |
status_cb(2) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment