Created

Embed URL

HTTPS clone URL

SSH clone URL

You can clone with HTTPS or SSH.

Download Gist

A non working crappy WebSocket server.

View Server.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
import socket, hashlib, base64, signal, sys, struct, random
 
"""
Parses a WebSocket frame.
"""
class Frame:
_payloadLen = 0
_payloadStart = 2
_maskStart = 2
_maskData = []
 
fin = False
continues = False
utf8 = False
binary = False
terminate = False
ping = False
pong = False
mask = False
 
def parse (self, data):
self.parseFirstByte(data[0])
self.parseSecondByte(data[1])
 
if self._payloadLen == 126: #16 bit int length
self._payloadLen = (data[2] << 8) + data[3]
self._maskStart += 2
self._payloadStart += 2
 
if self._payloadLen == 127: #64 bit int length
self._payloadLen = (data[2] << 56) + (data[3] << 48) + (data[4] <<40) + (data[5] << 32) + (data[6] << 24) + (data[7] << 16) + (data[8] << 8) + data[9]
self._maskStart += 8
self._payloadStart += 8
 
if True == self.mask:
self._maskData = [
data[self._maskStart],
data[self._maskStart + 1],
data[self._maskStart + 2],
data[self._maskStart + 3]
]
 
def parseFirstByte (self, byte):
self.fin = byte >= 128
opcode = byte
if True == self.fin:
opcode -= 128
self.continues = opcode == 0
self.utf8 = opcode == 1
self.binary = opcode == 2
self.terminate = opcode == 8
self.ping = opcode == 9
self.pong = opcode == 10
 
def parseSecondByte (self, byte):
self.mask = byte >= 128
self._payloadLen = byte
if True == self.mask:
self._payloadStart += 4
self._payloadLen -= 128
 
def getPayload (self, data):
if True == self.mask:
res = bytearray(self._payloadLen)
i = 0
for char in data[self._payloadStart:]:
res.append(char^self._maskData[i%4])
i += 1
 
return res
 
return data[self._payloadStart:]
 
def __str__ (self):
lenthsFrm = " maskStart: {}\n payloadStart: {}\n payloadLen: {}\n"
lenths = lenthsFrm.format(self._maskStart, self._payloadStart, self._payloadLen)
 
flagsFrm = " fin: {}\n continues: {}\n utf8: {}\n binary: {}\n terminate: {}\n ping: {}\n pong: {}\n mask: {}\n"
flags = flagsFrm.format(self.fin, self.continues, self.utf8, self.binary, self.terminate, self.ping, self.pong, self.mask)
 
return "Frame:\n" + lenths + flags
"""
Handles the WebSocket handshake.
"""
class Handshake:
def _buildWebSocketAcceptFromRequestHeader (self, header):
searchTerm = "Sec-WebSocket-Key: "
start = header.find(searchTerm) + len(searchTerm)
end = header.find("\r\n", start)
key = header[start:end]
 
guid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
key = (key + guid).encode('utf-8')
sha1 = hashlib.sha1(key).digest()
 
return base64.encodestring(sha1)
 
def _buildResponseHeader (self, key):
return str("HTTP/1.1 101 Switching Protocols\r\n" +
"Upgrade: websocket\r\n" +
"Connection: Upgrade\r\n" +
"Sec-WebSocket-Accept: " +
key.decode('utf-8') +
"\r\n\r\n")
def perform (self, data):
key = self._buildWebSocketAcceptFromRequestHeader(data.decode("utf-8"))
 
return self._buildResponseHeader(key)
 
"""
Unmaks a websocket frame using Frame.
"""
def readFrame (data):
frame = Frame()
frame.parse(data)
return frame.getPayload(data).decode("utf-8")
 
"""
Should create a frame containing the given text msg.
"""
def createFrame (text):
length = len(text)
 
if length <= 125:
ret = bytearray([129, length])
 
for byte in text.encode("utf-8"):
ret.append(byte)
 
for byte in ret:
print(byte)
 
print(ret)
 
return ret
#TODO 16 & 64Bit payload length
 
handshake = Handshake();
 
soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
soc.bind(("localhost", int(sys.argv[1])))
soc.listen(1)
conn, addr = soc.accept()
 
data = conn.recv(1024)
conn.sendall(handshake.perform(data).encode("utf-8"))
while 1:
data = conn.recv(1024) #TODO Multiple frames
if len(data) > 0:
print(readFrame(data))
conn.send(createFrame("Hello"))
 
conn.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.