-
-
Save Cacodaimon/33ff6c3c4b312b074c3e to your computer and use it in GitHub Desktop.
A non working crappy WebSocket server.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket, hashlib, base64, signal, sys, struct, random | |
""" | |
Parses a WebSocket frame. | |
""" | |
class Frame: | |
_payloadLen = 0 | |
_payloadStart = 2 | |
_maskStart = 2 | |
_maskData = [] | |
fin = False | |
continues = False | |
utf8 = False | |
binary = False | |
terminate = False | |
ping = False | |
pong = False | |
mask = False | |
def parse (self, data): | |
self.parseFirstByte(data[0]) | |
self.parseSecondByte(data[1]) | |
if self._payloadLen == 126: #16 bit int length | |
self._payloadLen = (data[2] << 8) + data[3] | |
self._maskStart += 2 | |
self._payloadStart += 2 | |
if self._payloadLen == 127: #64 bit int length | |
self._payloadLen = (data[2] << 56) + (data[3] << 48) + (data[4] <<40) + (data[5] << 32) + (data[6] << 24) + (data[7] << 16) + (data[8] << 8) + data[9] | |
self._maskStart += 8 | |
self._payloadStart += 8 | |
if True == self.mask: | |
self._maskData = [ | |
data[self._maskStart], | |
data[self._maskStart + 1], | |
data[self._maskStart + 2], | |
data[self._maskStart + 3] | |
] | |
def parseFirstByte (self, byte): | |
self.fin = byte >= 128 | |
opcode = byte | |
if True == self.fin: | |
opcode -= 128 | |
self.continues = opcode == 0 | |
self.utf8 = opcode == 1 | |
self.binary = opcode == 2 | |
self.terminate = opcode == 8 | |
self.ping = opcode == 9 | |
self.pong = opcode == 10 | |
def parseSecondByte (self, byte): | |
self.mask = byte >= 128 | |
self._payloadLen = byte | |
if True == self.mask: | |
self._payloadStart += 4 | |
self._payloadLen -= 128 | |
def getPayload (self, data): | |
if True == self.mask: | |
res = bytearray(self._payloadLen) | |
i = 0 | |
for char in data[self._payloadStart:]: | |
res.append(char^self._maskData[i%4]) | |
i += 1 | |
return res | |
return data[self._payloadStart:] | |
def __str__ (self): | |
lenthsFrm = " maskStart: {}\n payloadStart: {}\n payloadLen: {}\n" | |
lenths = lenthsFrm.format(self._maskStart, self._payloadStart, self._payloadLen) | |
flagsFrm = " fin: {}\n continues: {}\n utf8: {}\n binary: {}\n terminate: {}\n ping: {}\n pong: {}\n mask: {}\n" | |
flags = flagsFrm.format(self.fin, self.continues, self.utf8, self.binary, self.terminate, self.ping, self.pong, self.mask) | |
return "Frame:\n" + lenths + flags | |
""" | |
Handles the WebSocket handshake. | |
""" | |
class Handshake: | |
def _buildWebSocketAcceptFromRequestHeader (self, header): | |
searchTerm = "Sec-WebSocket-Key: " | |
start = header.find(searchTerm) + len(searchTerm) | |
end = header.find("\r\n", start) | |
key = header[start:end] | |
guid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" | |
key = (key + guid).encode('utf-8') | |
sha1 = hashlib.sha1(key).digest() | |
return base64.encodestring(sha1) | |
def _buildResponseHeader (self, key): | |
return str("HTTP/1.1 101 Switching Protocols\r\n" + | |
"Upgrade: websocket\r\n" + | |
"Connection: Upgrade\r\n" + | |
"Sec-WebSocket-Accept: " + | |
key.decode('utf-8') + | |
"\r\n\r\n") | |
def perform (self, data): | |
key = self._buildWebSocketAcceptFromRequestHeader(data.decode("utf-8")) | |
return self._buildResponseHeader(key) | |
""" | |
Unmaks a websocket frame using Frame. | |
""" | |
def readFrame (data): | |
frame = Frame() | |
frame.parse(data) | |
return frame.getPayload(data).decode("utf-8") | |
""" | |
Should create a frame containing the given text msg. | |
""" | |
def createFrame (text): | |
length = len(text) | |
if length <= 125: | |
ret = bytearray([129, length]) | |
for byte in text.encode("utf-8"): | |
ret.append(byte) | |
for byte in ret: | |
print(byte) | |
print(ret) | |
return ret | |
#TODO 16 & 64Bit payload length | |
handshake = Handshake(); | |
soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
soc.bind(("localhost", int(sys.argv[1]))) | |
soc.listen(1) | |
conn, addr = soc.accept() | |
data = conn.recv(1024) | |
conn.sendall(handshake.perform(data).encode("utf-8")) | |
while 1: | |
data = conn.recv(1024) #TODO Multiple frames | |
if len(data) > 0: | |
print(readFrame(data)) | |
conn.send(createFrame("Hello")) | |
conn.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment