Created
November 8, 2017 18:51
-
-
Save larsks/1cf5f8cd613522cc15feebb93433cc32 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import network | |
import select | |
import socket | |
import time | |
import ubinascii as binascii | |
DISCOVER_PORT = 2112 | |
DISCOVER_SSID_PREFIX = 'sensor' | |
DISCOVER_PASSWORD = 'sensornet' | |
ap_if = network.WLAN(network.AP_IF) | |
sta_if = network.WLAN(network.STA_IF) | |
def read_headers(conn): | |
req = [] | |
while True: | |
line = conn.readline() | |
if not line or line == b'\r\n': | |
break | |
req.append(line) | |
method, uri, version = (req[0].split() + ['HTTP/1.0'])[:3] | |
method = method | |
uri = uri | |
version = version | |
headers = {} | |
for header in req[1:]: | |
name, value = header.split(b': ') | |
headers[name.lower()] = value | |
return method, uri, version, headers | |
class Discover: | |
def __init__(self, | |
port=DISCOVER_PORT, | |
ssid_prefix=DISCOVER_SSID_PREFIX, | |
password=DISCOVER_PASSWORD, | |
channel=1): | |
self.port = int(port) | |
self.ssid_prefix = ssid_prefix | |
self.channel = channel | |
self.password = password | |
self.ssid = self.get_ssid() | |
def get_ssid(self): | |
sta_if = network.WLAN(network.STA_IF) | |
mac = binascii.hexlify(sta_if.config('mac')).decode('utf8') | |
return '{}-{}'.format( | |
self.ssid_prefix, mac) | |
def start_network(self): | |
ap_if.active(True) | |
while not ap_if.active(): | |
time.sleep(0.5) | |
ap_if.config(essid=self.ssid, | |
channel=self.channel, | |
authmode=network.AUTH_WPA_WPA2_PSK, | |
password=self.password) | |
def stop_network(self): | |
ap_if.active(False) | |
def create_socket(self): | |
s = socket.socket() | |
s.setsockopt(socket.SOL_SOCKET, | |
socket.SO_REUSEADDR, | |
1) | |
s.bind(('', self.port)) | |
s.listen(1) | |
return s | |
def serve(self): | |
s = self.create_socket() | |
p = select.poll() | |
p.register(s, select.POLLIN) | |
while True: | |
for obj, event in p.poll(): | |
conn, addr = s.accept() | |
print('* new connection from %s', addr) | |
self.handle_connection(conn) | |
def send_response(self, conn, status, status_text, body=None): | |
conn.send('HTTP/1.1 %d %s\r\n\r\n' % (status, status_text)) | |
if body is not None: | |
conn.send(body) | |
def handle_connection(self, conn): | |
method, uri, version, headers = read_headers(conn) | |
if method != b'PUT' or uri != b'/config.json': | |
self.send_response(conn, 500, 'Bad request', | |
'Bad request') | |
else: | |
self.send_response(conn, 200, 'Okay', | |
'Configuration accepted') | |
conn.close() | |
d = Discover() | |
d.start_network() | |
d.serve() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment