Skip to content

Instantly share code, notes, and snippets.

@dlech
Created December 25, 2020 01:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dlech/cda549987c342bfb0e51f9a3a646d42a to your computer and use it in GitHub Desktop.
Save dlech/cda549987c342bfb0e51f9a3a646d42a to your computer and use it in GitHub Desktop.
WebSocket connection for Pybricks v2.x mailboxes
"""WebSocket server compatible with Pybricks v2.x mailboxes."""
# Ref: https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API/Writing_WebSocket_servers
# Ref: https://tools.ietf.org/html/rfc6455
# Ref: https://github.com/pybricks/pybricks-micropython/blob/v2/bricks/ev3dev/modules/pybricks/messaging.py
# Ref: https://github.com/pybricks/pybricks-micropython/blob/v2/bricks/ev3dev/modules/pybricks/bluetooth.py
# Ref: https://github.com/python/cpython/blob/3.9/Lib/socketserver.py
from ubinascii import b2a_base64
from uhashlib import sha1
from uerrno import EBADF
from usocket import (getaddrinfo, sockaddr, inet_ntop, socket,
AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR)
from ustruct import pack
from uwebsocket import websocket
from pybricks.bluetooth import ThreadingMixIn
from pybricks.messaging import MailboxHandler, MailboxHandlerMixIn
def server_handshake(sock):
"""Performs WebSocket server handshake to ensure that the socket is a
WebSocket and that it uses the x-ev3-bytecodes-mailbox-only protocol.
"""
clr = sock.makefile("rwb", 0)
line = clr.readline()
webkey = None
protocol = None
while True:
line = clr.readline()
if not line:
raise OSError("EOF in headers")
if line == b"\r\n":
break
h, v = [x.strip() for x in line.split(b":", 1)]
if h == b"Sec-WebSocket-Key":
webkey = v
elif h == b"Sec-WebSocket-Protocol":
protocol = v
if not webkey:
raise RuntimeError("not a websocket request")
if protocol != b"x-ev3-bytecodes-mailbox-only":
raise RuntimeError("expecting x-ev3-bytecodes-mailbox-only protocol")
d = sha1(webkey)
d.update(b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11")
respkey = d.digest()
respkey = b2a_base64(respkey)[:-1]
sock.send(
b"""\
HTTP/1.1 101 Switching Protocols\r
Upgrade: websocket\r
Connection: Upgrade\r
Sec-Websocket-Protocol: x-ev3-bytecodes-mailbox-only\r
Sec-WebSocket-Accept: """
)
sock.send(respkey)
sock.send("\r\n\r\n")
class TCPServer:
"""Object that simplifies setting up an TCP/IP server.
This is based on the ``socketserver.TCPServer`` class in the Python
standard library.
"""
address_family = AF_INET
socket_type = SOCK_STREAM
request_queue_size = 1
allow_reuse_address = True
def __init__(self, server_address, RequestHandlerClass):
"""Creates a new server object.
Args:
server_address (tuple of str, int):
A tuple containing the IP address, e.g. ``'0.0.0.0'`` and the
port, e.g. ``8080``.
RequestHandlerClass (callable):
Callback function to handle requests.
"""
self.server_address = server_address
self.RequestHandlerClass = RequestHandlerClass
self.socket = socket(self.address_family, self.socket_type)
try:
if self.allow_reuse_address:
self.socket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
# MicroPython requires special address object handling to convert
# it to binary form before passing to bind()
ai = getaddrinfo(server_address[0], server_address[1], self.address_family)
addr = ai[0][-1]
self.socket.bind(addr)
# getsockname() isn't implemented in MicroPython
# self.server_address = self.socket.getsockname()
self.socket.listen(self.request_queue_size)
except:
self.server_close()
raise
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.server_close()
def handle_request(self):
try:
request, addr_data = self.socket.accept()
except OSError:
return
try:
family, addr, port = sockaddr(addr_data)
client_address = inet_ntop(family, addr), port
self.process_request(request, client_address)
except:
request.close()
raise
def process_request(self, request, client_address):
self.finish_request(request, client_address)
request.close()
def finish_request(self, request, client_address):
self.RequestHandlerClass(request, client_address, self)
def server_close(self):
self.socket.close()
class ThreadingTCPServer(ThreadingMixIn, TCPServer):
"""Version of :class:`TCPServer` that handles connections in a new
thread.
"""
pass
class WebSocket(websocket):
"""Wrapper around websocket to implement missing methods."""
def __init__(self, socket):
super().__init__(socket, True)
# set WebSocket opcode type to binary instead of text
# 9 = MP_STREAM_SET_DATA_OPTS, 0x02 = binary
self.ioctl(9, 0x2)
def send(self, data):
"""Provides send method for MailboxHandlerMixIn.send_to_mailbox()."""
self.write(data)
def close(self):
# set WebSocket opcode type to close
# 9 = MP_STREAM_SET_DATA_OPTS, 0x08 = close
self.ioctl(9, 0x8)
try:
# close code 1001 means that the server is going away
self.write(pack("!H", 1001))
except OSError:
pass
super().close()
class WebSocketMailboxHandler(MailboxHandler):
"""Class that handles incoming WebSocket requests."""
def setup(self):
server_handshake(self.request)
self.request = self.wfile = self.rfile = WebSocket(self.request)
def handle(self):
try:
super().handle()
except OSError as ex:
# This error can also occur when the file closes. MailboxHandler
# only handles ECONNRESET
if ex.args[0] == EBADF:
return
raise
class WebSocketMailboxServer(MailboxHandlerMixIn, ThreadingTCPServer):
def __init__(self, address="0.0.0.0", port=2013):
"""Object that represents an incoming WebSocket connection."""
super().__init__()
super(ThreadingTCPServer, self).__init__(
(address, port), WebSocketMailboxHandler
)
def wait_for_connection(self, count=1):
"""Waits for a WebSocket client to connect.
Arguments:
count (int):
The number of remote connections to wait for.
Raises:
OSError:
There was a problem establishing the connection.
"""
for _ in range(count):
self.handle_request()
def server_close(self):
for client in self._clients.values():
client.close()
return super().server_close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment