Skip to content

Instantly share code, notes, and snippets.

@nathan-heskia
Last active January 8, 2022 19:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nathan-heskia/1d42fba6df79cee8d313dc3adf41dc28 to your computer and use it in GitHub Desktop.
Save nathan-heskia/1d42fba6df79cee8d313dc3adf41dc28 to your computer and use it in GitHub Desktop.
Following the Websocket protocol with a Python TCP socket
import base64
import random
import socket
import ssl
import time
import sys
import _thread
opcodes = {
'continuation': 0,
'text': 1,
'binary': 2,
'close': 8,
'ping': 9,
'pong': 10
}
"""
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len | Extended payload length |
|I|S|S|S| (4) |A| (7) | (16/64) |
|N|V|V|V| |S| | (if payload len==126/127) |
| |1|2|3| |K| | |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
| Extended payload length continued, if payload len == 127 |
+ - - - - - - - - - - - - - - - +-------------------------------+
| |Masking-key, if MASK set to 1 |
+-------------------------------+-------------------------------+
| Masking-key (continued) | Payload Data |
+-------------------------------- - - - - - - - - - - - - - - - +
: Payload Data continued ... :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
| Payload Data continued ... |
+---------------------------------------------------------------+
"""
def get_metadata(fin, opcode, masked, payload):
frame = bytearray()
payload_len = len(payload)
code = opcodes[opcode]
frame.append(128 + code if fin else 0 + code)
frame.append(128 if masked else 0)
if payload_len < 126:
frame[1] += payload_len
elif payload_len < 65536:
frame[1] += 126
frame.append(payload_len >> 8)
frame.append(payload_len & 0x00FF)
else:
frame[1] += 127
frame.append(payload_len >> 24)
frame.append(payload_len >> 16)
frame.append(payload_len >> 8)
frame.append(payload_len & 0xFF)
if masked:
mask = []
for i in range(0, 4):
r = random.randint(0, 255)
mask.append(r)
frame.append(r)
for i in range(0, payload_len):
x = ord(payload[i])
frame.append(x ^ mask[i % 4])
return frame
def get_opcode(data):
if len(data) < 2:
return None
byte1 = data[0]
return byte1 & 0xF
def extract_frame(data):
data_length = len(data)
if data_length < 2:
print('Not enough bytes received')
return
byte1 = data[0]
opcode = byte1 & 0xF
rsv = (byte1 & 0x70) == 0
fin = byte1 >> 7 == 1
byte2 = data[1]
mask = byte2 >> 7 == 1
payload_length = byte2 & 0x7F
start = 6 if mask else 2
# Get the actual payload length
if (payload_length == 126):
byte3 = data[2]
byte4 = data[3]
payload_length = (byte3 << 8) + byte4
start += 2
elif (payload_length == 127):
start += 8
# Extract the payload
payload = data[start:start + payload_length]
if mask:
payload_mask = data[start-4:start]
unmasked_payload = u''
for i in range(0, payload_length):
x = payload[i]
unmasked_payload += unichr(x ^ payload_mask[i % 4])
elif payload_length > 0:
print('Received payload of %d bytes: %s\r\n' % (payload_length, payload), file=sys.stderr)
def generate_key():
chars = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!"$&/()=[]{}0123456789'
key = []
chars_length = len(chars)
for i in range(0, 16):
key.append(chars[random.randint(0, chars_length - 1)])
return base64.b64encode(''.join(key).encode('utf-8'))
# Define a function for the thread
def get_user_input(sock):
try:
while True:
std_in = input('')
frame = get_metadata(True, 'text', True, std_in)
sock.send(frame)
except (KeyboardInterrupt, EOFError):
print('Received KeyboardInterrupt/EOFError')
sock.close()
if __name__ == '__main__':
# Used nslookup for echo.websocket.events ip address
TCP_IP = '52.20.78.240'
# SSL port is 443
TCP_PORT = 443
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sslsock = None
try:
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
context.verify_mode = ssl.CERT_NONE
sslsock = context.wrap_socket(s, server_hostname='echo.websocket.events', server_side=False)
except Exception as e:
print('SSL socket wrapping failed', e)
sys.exit(1)
sslsock.connect((TCP_IP, TCP_PORT))
key = generate_key()
headers_l = [
'GET wss://echo.websocket.events/?encoding=text HTTP/1.1',
'Host: echo.websocket.events:443',
'Connection: Upgrade',
'Upgrade: websocket',
'Origin: https://echo.websocket.events',
'Sec-WebSocket-Version: 13',
'User-Agent: python-websocket-client',
'Sec-WebSocket-Key: ' + key.decode('utf-8')
]
# A blank line indicates all HTTP headers have been sent
headers = '{}\r\n\r\n'.format('\r\n'.join(headers_l)).encode('utf-8')
print(headers)
# Open connection by sending headers
sslsock.write(headers)
data = sslsock.read()
if len(data) > 0:
print('%s' % data, file=sys.stderr)
else:
sslsock.close()
sys.exit(1)
try:
_thread.start_new_thread(get_user_input, (sslsock, ))
while True:
data = sslsock.recv(2**19)
# Check for closed connection
if not data or len(data) == 0:
break
# Check for ping, otherwise extract frame payload
if get_opcode(data) == 9:
frame = get_metadata(True, 'pong', True, 'PONG')
sslsock.send(frame)
else:
extract_frame(data)
except (KeyboardInterrupt, EOFError):
pass
finally:
print('\n')
sslsock.close()
sys.exit(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment