Skip to content

Instantly share code, notes, and snippets.

@jramseygreen
Last active December 18, 2021 13:57
Show Gist options
  • Save jramseygreen/bf6c82f356df0e49bd6fb0609029c34e to your computer and use it in GitHub Desktop.
Save jramseygreen/bf6c82f356df0e49bd6fb0609029c34e to your computer and use it in GitHub Desktop.
Bare bones websocket server in python
<script>
window.onload = function() {
var connection = new WebSocket("ws://localhost:9876/");
connection.onopen = function () {
connection.send('hello world');
};
connection.onerror = function (error) {
console.log('WebSocket Error ' + error);
};
connection.onmessage = function (e) {
console.log('Server: ' + e.data);
};
};
</script>
from ws_server import ws_server
def on_message(conn, msg):
print(conn, msg)
ws.send(conn, "received " + msg)
ws = ws_server(host="localhost", port=9876)
ws.on_message(on_message)
ws.listen()
import hashlib
import base64
import socket
import struct
import six
import threading
class ws_server:
def __init__(self, host='localhost', port=80, on_message_function=None):
self.running = True
self.host = host
self.port = port
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.on_message_function = on_message_function
def handshake(self, conn):
request = conn.recv(1024).strip()
specificationGUID = b'258EAFA5-E914-47DA-95CA-C5AB0DC85B11'
websocketKey = b''
lines = request.splitlines()
for line in lines:
args = line.partition(b': ')
if args[0] == b'Sec-WebSocket-Key':
websocketKey = args[2]
fullKey = hashlib.sha1((websocketKey + specificationGUID)).digest()
acceptKey = base64.b64encode(fullKey)
response = b'HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: ' + acceptKey + b'\r\n\r\n'
conn.send(response)
def ws_encode(self, data=""):
if isinstance(data, six.text_type):
data = data.encode('utf-8')
length = len(data)
fin, rsv1, rsv2, rsv3, opcode = 1, 0, 0, 0, 0x1
frame_header = chr(fin << 7 | rsv1 << 6 | rsv2 << 5 | rsv3 << 4 | opcode)
if length < 0x7e:
frame_header += chr(0 << 7 | length)
frame_header = six.b(frame_header)
elif length < 1 << 16:
frame_header += chr(0 << 7 | 0x7e)
frame_header = six.b(frame_header)
frame_header += struct.pack("!H", length)
else:
frame_header += chr(0 << 7 | 0x7f)
frame_header = six.b(frame_header)
frame_header += struct.pack("!Q", length)
return frame_header + data
def ws_decode(self, data):
frame = bytearray(data)
length = frame[1] & 127
indexFirstMask = 2
if length == 126:
indexFirstMask = 4
elif length == 127:
indexFirstMask = 10
indexFirstDataByte = indexFirstMask + 4
mask = frame[indexFirstMask:indexFirstDataByte]
i = indexFirstDataByte
j = 0
decoded = []
while i < len(frame):
decoded.append(frame[i] ^ mask[j % 4])
i += 1
j += 1
return bytes(decoded).decode("utf-8")
def listen(self):
self.sock.bind((self.host, self.port))
self.sock.listen()
while self.running:
conn, addr = self.sock.accept()
self.handshake(conn)
threading.Thread(target=self.client_thread, args=(conn,)).start()
def send(self, conn, msg):
conn.send(self.ws_encode(msg))
def on_message(self, func):
self.on_message_function = func
def client_thread(self, conn):
try:
while True:
msg = conn.recv(1024)
self.on_message_function(conn, self.ws_decode(msg))
except:
conn.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment