Skip to content

Instantly share code, notes, and snippets.

@rniwase
Last active October 31, 2023 17:55
Show Gist options
  • Save rniwase/4f35c886bd12bd102bc7f9525902d0ce to your computer and use it in GitHub Desktop.
Save rniwase/4f35c886bd12bd102bc7f9525902d0ce to your computer and use it in GitHub Desktop.
MicroPython example for Websocket echo server on WIZNET5K + RPi Pico(RP2040)
import usocket
from machine import Pin, SPI
import network
import time
import re
import hashlib
import ubinascii
led = Pin(25, Pin.OUT)
#W5x00 chip init
def w5x00_init():
spi = SPI(0, 2_000_000, mosi=Pin(19), miso=Pin(16), sck=Pin(18))
nic = network.WIZNET5K(spi, Pin(17), Pin(20)) # SPI, nCS, nRST
nic.active(True)
# nic.ifconfig((
# '192.168.10.201', # Local IP Address
# '255.255.255.0', # Subnet mask
# '192.168.10.1', # Gateway
# '192.168.10.1' # DNS Server
# ))
nic.ifconfig('dhcp')
print('IP address :', nic.ifconfig())
while not nic.isconnected():
time.sleep(1)
print(nic.regs())
def get_header(msg, header):
header_len = len(header)
f = msg.find(header)
if f == -1:
return None
msg = msg[f+header_len:]
f = msg.find(r'\r\n')
msg = msg[:f]
return msg
def main():
w5x00_init()
s = usocket.socket()
s.bind(('0.0.0.0', 8777))
s.listen(5)
while True:
ws_key = None
ws_ver = None
conn, addr = s.accept()
print('Connect from %s' % str(addr))
request = conn.recv(1024)
request = str(request)
is_req_sw = (request.find('GET / HTTP/1.1') != -1) & \
(request.find('Upgrade: websocket') != -1) & \
(request.find('Connection: Upgrade') != -1)
if is_req_sw:
ws_key = get_header(request, 'Sec-WebSocket-Key: ')
ws_ver = get_header(request, 'Sec-WebSocket-Version: ')
if (ws_key != None) & (ws_ver == '13'):
ws_accept = ubinascii.b2a_base64(
hashlib.sha1(
ws_key + '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'
).digest()
).decode('ascii')
conn.send(
'HTTP/1.1 101 Switching Protocols\r\n'
'Upgrade: websocket\r\n'
'Connection: Upgrade\r\n'
'Sec-WebSocket-Accept: {}\r\n'
'Server: MicroPython on RP2040\r\n'
'\r\n'.format(ws_accept[:-1])
)
led.value(1)
else:
response = '501 Not Implemented\r\n'
conn.send(
'HTTP/1.1 501 Not Implemented\r\n'
'Connection: close\r\n'
'Connection: close\r\n'
'Content-Type: text/plain\r\n'
'Content-Length: {}\r\n'
'\r\n'
'{}'.format(
len(response),
response
)
)
conn.close()
led.value(0)
continue
while True:
request = conn.recv(1024)
if len(request) == 0: # is TCP FIN
break
if request[0] & 0x0F == 0x08: # Websocket Connection Close
conn.send(b'\x88\x00')
print('WebSocket closed')
break
payload_len = request[1] & 0x7F
if request[1] & 0x80 == 0x80: # is payload masked
mask_key = request[2:6]
payload = request[6:]
unmasked = []
for i in range(payload_len):
unmasked.append(payload[i] ^ mask_key[i%4])
payload = bytes(unmasked)
else:
payload = request[3:]
if request[0] & 0x0F == 0x01: # is payload Text
payload = payload.decode('ascii')
elif request[0] & 0x0F == 0x02: # is payload Binary
pass
elif request[0] & 0x0F == 0x09: # Ping
conn.send(
b'\x8A' + payload_len.to_bytes(1, 'big') + payload
) # send back Pong
continue
print(payload)
# Echoback Websocket
conn.send(
request[0].to_bytes(1, 'big') + \
payload_len.to_bytes(1, 'big') + \
payload
)
conn.close()
print('Connection closed')
led.value(0)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment