Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
A very simple WebSocketHandler to use with a socketserver.ForkingTCPServer in python3
import http.server
import struct
from base64 import b64encode
from hashlib import sha1
import errno, socket #for socket exceptions
import threading
class WebSocketError(Exception):
pass
class HTTPWebSocketsHandler(http.server.SimpleHTTPRequestHandler):
_ws_GUID = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'
_opcode_continu = 0x0
_opcode_text = 0x1
_opcode_binary = 0x2
_opcode_close = 0x8
_opcode_ping = 0x9
_opcode_pong = 0xa
mutex = threading.Lock()
def on_ws_message(self, message):
"""Override this handler to process incoming websocket messages."""
pass
def on_ws_connected(self):
"""Override this handler."""
pass
def on_ws_closed(self):
"""Override this handler."""
pass
def send_message(self, message):
return self._send_message(self._opcode_text, message)
def setup(self):
http.server.SimpleHTTPRequestHandler.setup(self)
self.connected = False
def checkAuthentication(self):
auth = self.headers.get('Authorization')
if auth != "Basic %s" % self.server.auth:
self.send_response(401)
self.send_header("WWW-Authenticate", 'Basic realm="realm"')
self.end_headers()
return False
return True
def do_GET(self):
if hasattr(self.server, 'auth') and self.server.auth and not self.checkAuthentication():
return
if self.headers.get("Upgrade", None) == "websocket":
self._handshake()
#This handler is in websocket mode now.
#do_GET only returns after client close or socket error.
self._read_messages()
else:
http.server.SimpleHTTPRequestHandler.do_GET(self)
def _read_messages(self):
while self.connected == True:
try:
self._read_next_message()
except Exception as err:
#unexpected error in websocket connection.
self.log_error("RCV: Exception: in _read_messages: %s" % str(err.args))
self._ws_close()
def _read_next_message(self):
#self.rfile.read(n) is blocking.
#it returns however immediately when the socket is closed.
try:
self.opcode = ord(self.rfile.read(1)) & 0x0F
length = ord(self.rfile.read(1)) & 0x7F
if length == 126:
length = struct.unpack(">H", self.rfile.read(2))[0]
elif length == 127:
length = struct.unpack(">Q", self.rfile.read(8))[0]
masks = [byte for byte in self.rfile.read(4)]
decoded = ""
for char in self.rfile.read(length):
decoded += chr(char ^ masks[len(decoded) % 4])
self._on_message(decoded)
except (struct.error, TypeError) as e:
#catch exceptions from ord() and struct.unpack()
if self.connected:
raise WebSocketError("Websocket read aborted while listening")
else:
#the socket was closed while waiting for input
self.log_error("RCV: _read_next_message aborted after closed connection")
def _send_message(self, opcode, message):
try:
if type(message) != bytes:
message = message.encode()
#use of self.wfile.write gives socket exception after socket is closed. Avoid.
self.request.send(bytes([0x80 + opcode]))
length = len(message)
if length <= 125:
self.request.send(bytes([length]))
elif length >= 126 and length <= 65535:
self.request.send(bytes([126]))
self.request.send(struct.pack(">H", length))
else:
self.request.send(bytes([127]))
self.request.send(struct.pack(">Q", length))
if length > 0:
self.request.send(message)
return True
except Exception as err:
#unexpected error in websocket connection.
self.log_error("SND: Exception: in _send_message: %s" % str(err.args))
self._ws_close()
return False
def _handshake(self):
headers=self.headers
if headers.get("Upgrade", None) != "websocket":
return
key = headers['Sec-WebSocket-Key']
sha = sha1((key + self._ws_GUID).encode()).digest()
digest = b64encode(sha).decode()
self.send_response(101, 'Switching Protocols')
self.send_header('Upgrade', 'websocket')
self.send_header('Connection', 'Upgrade')
self.send_header('Sec-WebSocket-Accept', digest)
self.end_headers()
self.connected = True
#self.close_connection = 0
self.on_ws_connected()
def _ws_close(self):
#avoid closing a single socket two time for send and receive.
self.mutex.acquire()
try:
if self.connected:
self.connected = False
#Terminate BaseHTTPRequestHandler.handle() loop:
self.close_connection = 1
#send close and ignore exceptions. An error may already have occurred.
try:
self._send_close()
except:
pass
self.on_ws_closed()
else:
self.log_message("_ws_close websocket in closed state. Ignore.")
pass
finally:
self.mutex.release()
def _on_message(self, message):
#self.log_message("_on_message: opcode: %02X msg: %s" % (self.opcode, message))
# close
if self.opcode == self._opcode_close:
self.connected = False
#Terminate BaseHTTPRequestHandler.handle() loop:
self.close_connection = 1
try:
self._send_close()
except:
pass
self.on_ws_closed()
# ping
elif self.opcode == self._opcode_ping:
_send_message(self._opcode_pong, message)
# pong
elif self.opcode == self._opcode_pong:
pass
# data
elif (self.opcode == self._opcode_continu or
self.opcode == self._opcode_text or
self.opcode == self._opcode_binary):
self.on_ws_message(message)
def _send_close(self):
#Dedicated _send_close allows for catch all exception handling
msg = bytearray()
msg.append(0x80 + self._opcode_close)
msg.append(0x00)
self.request.send(msg)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment