Skip to content

Instantly share code, notes, and snippets.

@Cacodaimon
Created February 17, 2014 22:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Cacodaimon/33ff6c3c4b312b074c3e to your computer and use it in GitHub Desktop.
Save Cacodaimon/33ff6c3c4b312b074c3e to your computer and use it in GitHub Desktop.
A non working crappy WebSocket server.
import socket, hashlib, base64, signal, sys, struct, random
"""
Parses a WebSocket frame.
"""
class Frame:
_payloadLen = 0
_payloadStart = 2
_maskStart = 2
_maskData = []
fin = False
continues = False
utf8 = False
binary = False
terminate = False
ping = False
pong = False
mask = False
def parse (self, data):
self.parseFirstByte(data[0])
self.parseSecondByte(data[1])
if self._payloadLen == 126: #16 bit int length
self._payloadLen = (data[2] << 8) + data[3]
self._maskStart += 2
self._payloadStart += 2
if self._payloadLen == 127: #64 bit int length
self._payloadLen = (data[2] << 56) + (data[3] << 48) + (data[4] <<40) + (data[5] << 32) + (data[6] << 24) + (data[7] << 16) + (data[8] << 8) + data[9]
self._maskStart += 8
self._payloadStart += 8
if True == self.mask:
self._maskData = [
data[self._maskStart],
data[self._maskStart + 1],
data[self._maskStart + 2],
data[self._maskStart + 3]
]
def parseFirstByte (self, byte):
self.fin = byte >= 128
opcode = byte
if True == self.fin:
opcode -= 128
self.continues = opcode == 0
self.utf8 = opcode == 1
self.binary = opcode == 2
self.terminate = opcode == 8
self.ping = opcode == 9
self.pong = opcode == 10
def parseSecondByte (self, byte):
self.mask = byte >= 128
self._payloadLen = byte
if True == self.mask:
self._payloadStart += 4
self._payloadLen -= 128
def getPayload (self, data):
if True == self.mask:
res = bytearray(self._payloadLen)
i = 0
for char in data[self._payloadStart:]:
res.append(char^self._maskData[i%4])
i += 1
return res
return data[self._payloadStart:]
def __str__ (self):
lenthsFrm = " maskStart: {}\n payloadStart: {}\n payloadLen: {}\n"
lenths = lenthsFrm.format(self._maskStart, self._payloadStart, self._payloadLen)
flagsFrm = " fin: {}\n continues: {}\n utf8: {}\n binary: {}\n terminate: {}\n ping: {}\n pong: {}\n mask: {}\n"
flags = flagsFrm.format(self.fin, self.continues, self.utf8, self.binary, self.terminate, self.ping, self.pong, self.mask)
return "Frame:\n" + lenths + flags
"""
Handles the WebSocket handshake.
"""
class Handshake:
def _buildWebSocketAcceptFromRequestHeader (self, header):
searchTerm = "Sec-WebSocket-Key: "
start = header.find(searchTerm) + len(searchTerm)
end = header.find("\r\n", start)
key = header[start:end]
guid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
key = (key + guid).encode('utf-8')
sha1 = hashlib.sha1(key).digest()
return base64.encodestring(sha1)
def _buildResponseHeader (self, key):
return str("HTTP/1.1 101 Switching Protocols\r\n" +
"Upgrade: websocket\r\n" +
"Connection: Upgrade\r\n" +
"Sec-WebSocket-Accept: " +
key.decode('utf-8') +
"\r\n\r\n")
def perform (self, data):
key = self._buildWebSocketAcceptFromRequestHeader(data.decode("utf-8"))
return self._buildResponseHeader(key)
"""
Unmaks a websocket frame using Frame.
"""
def readFrame (data):
frame = Frame()
frame.parse(data)
return frame.getPayload(data).decode("utf-8")
"""
Should create a frame containing the given text msg.
"""
def createFrame (text):
length = len(text)
if length <= 125:
ret = bytearray([129, length])
for byte in text.encode("utf-8"):
ret.append(byte)
for byte in ret:
print(byte)
print(ret)
return ret
#TODO 16 & 64Bit payload length
handshake = Handshake();
soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
soc.bind(("localhost", int(sys.argv[1])))
soc.listen(1)
conn, addr = soc.accept()
data = conn.recv(1024)
conn.sendall(handshake.perform(data).encode("utf-8"))
while 1:
data = conn.recv(1024) #TODO Multiple frames
if len(data) > 0:
print(readFrame(data))
conn.send(createFrame("Hello"))
conn.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment