Skip to content

Instantly share code, notes, and snippets.

@larsks
Created November 8, 2017 18:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save larsks/1cf5f8cd613522cc15feebb93433cc32 to your computer and use it in GitHub Desktop.
Save larsks/1cf5f8cd613522cc15feebb93433cc32 to your computer and use it in GitHub Desktop.
import network
import select
import socket
import time
import ubinascii as binascii
DISCOVER_PORT = 2112
DISCOVER_SSID_PREFIX = 'sensor'
DISCOVER_PASSWORD = 'sensornet'
ap_if = network.WLAN(network.AP_IF)
sta_if = network.WLAN(network.STA_IF)
def read_headers(conn):
req = []
while True:
line = conn.readline()
if not line or line == b'\r\n':
break
req.append(line)
method, uri, version = (req[0].split() + ['HTTP/1.0'])[:3]
method = method
uri = uri
version = version
headers = {}
for header in req[1:]:
name, value = header.split(b': ')
headers[name.lower()] = value
return method, uri, version, headers
class Discover:
def __init__(self,
port=DISCOVER_PORT,
ssid_prefix=DISCOVER_SSID_PREFIX,
password=DISCOVER_PASSWORD,
channel=1):
self.port = int(port)
self.ssid_prefix = ssid_prefix
self.channel = channel
self.password = password
self.ssid = self.get_ssid()
def get_ssid(self):
sta_if = network.WLAN(network.STA_IF)
mac = binascii.hexlify(sta_if.config('mac')).decode('utf8')
return '{}-{}'.format(
self.ssid_prefix, mac)
def start_network(self):
ap_if.active(True)
while not ap_if.active():
time.sleep(0.5)
ap_if.config(essid=self.ssid,
channel=self.channel,
authmode=network.AUTH_WPA_WPA2_PSK,
password=self.password)
def stop_network(self):
ap_if.active(False)
def create_socket(self):
s = socket.socket()
s.setsockopt(socket.SOL_SOCKET,
socket.SO_REUSEADDR,
1)
s.bind(('', self.port))
s.listen(1)
return s
def serve(self):
s = self.create_socket()
p = select.poll()
p.register(s, select.POLLIN)
while True:
for obj, event in p.poll():
conn, addr = s.accept()
print('* new connection from %s', addr)
self.handle_connection(conn)
def send_response(self, conn, status, status_text, body=None):
conn.send('HTTP/1.1 %d %s\r\n\r\n' % (status, status_text))
if body is not None:
conn.send(body)
def handle_connection(self, conn):
method, uri, version, headers = read_headers(conn)
if method != b'PUT' or uri != b'/config.json':
self.send_response(conn, 500, 'Bad request',
'Bad request')
else:
self.send_response(conn, 200, 'Okay',
'Configuration accepted')
conn.close()
d = Discover()
d.start_network()
d.serve()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment