Last active
October 31, 2023 17:55
-
-
Save rniwase/4f35c886bd12bd102bc7f9525902d0ce to your computer and use it in GitHub Desktop.
MicroPython example for Websocket echo server on WIZNET5K + RPi Pico(RP2040)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import usocket | |
from machine import Pin, SPI | |
import network | |
import time | |
import re | |
import hashlib | |
import ubinascii | |
led = Pin(25, Pin.OUT) | |
#W5x00 chip init | |
def w5x00_init(): | |
spi = SPI(0, 2_000_000, mosi=Pin(19), miso=Pin(16), sck=Pin(18)) | |
nic = network.WIZNET5K(spi, Pin(17), Pin(20)) # SPI, nCS, nRST | |
nic.active(True) | |
# nic.ifconfig(( | |
# '192.168.10.201', # Local IP Address | |
# '255.255.255.0', # Subnet mask | |
# '192.168.10.1', # Gateway | |
# '192.168.10.1' # DNS Server | |
# )) | |
nic.ifconfig('dhcp') | |
print('IP address :', nic.ifconfig()) | |
while not nic.isconnected(): | |
time.sleep(1) | |
print(nic.regs()) | |
def get_header(msg, header): | |
header_len = len(header) | |
f = msg.find(header) | |
if f == -1: | |
return None | |
msg = msg[f+header_len:] | |
f = msg.find(r'\r\n') | |
msg = msg[:f] | |
return msg | |
def main(): | |
w5x00_init() | |
s = usocket.socket() | |
s.bind(('0.0.0.0', 8777)) | |
s.listen(5) | |
while True: | |
ws_key = None | |
ws_ver = None | |
conn, addr = s.accept() | |
print('Connect from %s' % str(addr)) | |
request = conn.recv(1024) | |
request = str(request) | |
is_req_sw = (request.find('GET / HTTP/1.1') != -1) & \ | |
(request.find('Upgrade: websocket') != -1) & \ | |
(request.find('Connection: Upgrade') != -1) | |
if is_req_sw: | |
ws_key = get_header(request, 'Sec-WebSocket-Key: ') | |
ws_ver = get_header(request, 'Sec-WebSocket-Version: ') | |
if (ws_key != None) & (ws_ver == '13'): | |
ws_accept = ubinascii.b2a_base64( | |
hashlib.sha1( | |
ws_key + '258EAFA5-E914-47DA-95CA-C5AB0DC85B11' | |
).digest() | |
).decode('ascii') | |
conn.send( | |
'HTTP/1.1 101 Switching Protocols\r\n' | |
'Upgrade: websocket\r\n' | |
'Connection: Upgrade\r\n' | |
'Sec-WebSocket-Accept: {}\r\n' | |
'Server: MicroPython on RP2040\r\n' | |
'\r\n'.format(ws_accept[:-1]) | |
) | |
led.value(1) | |
else: | |
response = '501 Not Implemented\r\n' | |
conn.send( | |
'HTTP/1.1 501 Not Implemented\r\n' | |
'Connection: close\r\n' | |
'Connection: close\r\n' | |
'Content-Type: text/plain\r\n' | |
'Content-Length: {}\r\n' | |
'\r\n' | |
'{}'.format( | |
len(response), | |
response | |
) | |
) | |
conn.close() | |
led.value(0) | |
continue | |
while True: | |
request = conn.recv(1024) | |
if len(request) == 0: # is TCP FIN | |
break | |
if request[0] & 0x0F == 0x08: # Websocket Connection Close | |
conn.send(b'\x88\x00') | |
print('WebSocket closed') | |
break | |
payload_len = request[1] & 0x7F | |
if request[1] & 0x80 == 0x80: # is payload masked | |
mask_key = request[2:6] | |
payload = request[6:] | |
unmasked = [] | |
for i in range(payload_len): | |
unmasked.append(payload[i] ^ mask_key[i%4]) | |
payload = bytes(unmasked) | |
else: | |
payload = request[3:] | |
if request[0] & 0x0F == 0x01: # is payload Text | |
payload = payload.decode('ascii') | |
elif request[0] & 0x0F == 0x02: # is payload Binary | |
pass | |
elif request[0] & 0x0F == 0x09: # Ping | |
conn.send( | |
b'\x8A' + payload_len.to_bytes(1, 'big') + payload | |
) # send back Pong | |
continue | |
print(payload) | |
# Echoback Websocket | |
conn.send( | |
request[0].to_bytes(1, 'big') + \ | |
payload_len.to_bytes(1, 'big') + \ | |
payload | |
) | |
conn.close() | |
print('Connection closed') | |
led.value(0) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment