Skip to content

Instantly share code, notes, and snippets.

@op01
Created May 6, 2015 10:52
Show Gist options
  • Save op01/0bb0e4bd3d073cb3afb2 to your computer and use it in GitHub Desktop.
Save op01/0bb0e4bd3d073cb3afb2 to your computer and use it in GitHub Desktop.
simple echo websocket server implement in python
import socket,select,time,re,base64,hashlib
def debug(b):
print("txt:")
print(b)
# print(b.decode('UTF-8'),end='')
print("ENDtxt")
def getAll(cs):
ret = bytes()
while len(select.select([cs],[],[],0)[0]):
recv=cs.recv(10)
if not recv:return None
ret+=recv
return ret
def unmask(data):
size=data[1]&(0b1111111)
offset=2
if size<=125:offset+=0
elif size==126:
offset+=2
size=int.from_bytes(data[2:4],byteorder='big')
else:
offset+=8
size=int.from_bytes(data[2:10],byteorder='big')
print("size=%d"%size)
mask=data[offset:offset+4]
print("mask:%r"%mask)
offset+=4
print("data:%r"%data[offset:])
ret=bytes()
for i in range(size):
xor=data[i+offset]^mask[i%4]
ret+=xor.to_bytes(1,byteorder='big')
print(ret)
if offset+size < len(data):
unmask(data[offset+size:])
host = ''
port = 8080
size = 1024
s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
s.bind((host,port))
s.listen(1)
s.setblocking(0)
while len(select.select([s],[],[],0)[0])==0:
time.sleep(1)
print("LOOP")
client, address = s.accept()
client.setblocking(0)
print("Incoming connection from ",address)
data = getAll(client)
debug(data)
wskey=re.search(r'^Sec-WebSocket-Key\s*:\s*(.+?)\r?$',data.decode('UTF-8'),re.M).group(1)
GUID="258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
print("wskey=[%r]"%wskey)
ret="""HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: %s
"""%base64.b64encode(hashlib.sha1((wskey+GUID).encode()).digest()).decode('UTF-8')
ret=ret.encode()
debug(ret)
client.send(ret)
while True:
data=getAll(client)
if data is None:break
if data:
unmask(data)
client.send(data)
time.sleep(1)
print("LOOP")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment