Skip to content

Instantly share code, notes, and snippets.

@carljm

carljm/test_ws.py

Last active Apr 22, 2019
Embed
What would you like to do?
Establishing a websocket connection to SocketIO 1.x from Python
import json
import requests
from websocket import create_connection
BASE = 'localhost:3000/socket.io/?EIO=3'
# First establish a polling-transport HTTP connection to get a session ID
url = 'http://%s&transport=polling' % BASE
print url
resp = requests.get(url)
print resp.status_code, resp.headers, resp.content
# Ignore the bin-packed preliminaries in the SocketIO response and just parse
# the JSON; it tells us everything we really need to know
json_data = json.loads(resp.content[resp.content.find('{'):])
sid = json_data['sid']
assert sid == resp.cookies['io']
# Now we have a session ID, establish a websocket connection
ws_url = 'ws://%s&transport=websocket&sid=%s' % (BASE, sid)
print ws_url
conn = create_connection(ws_url)
# Ping/pong not required, but verifies the connection is working
conn.send('2probe')
assert conn.recv() == '3probe'
# This is an "upgrade" packet. Not sure why it's needed, since we've already
# upgraded the connection to WebSocket and established a WebSocket
# connection. I guess maybe it upgrades our Socket.IO session to use this
# websocket connection as its transport? In any case, the next step doesn't
# work unless we do this first.
conn.send('5')
# Send a socket.io event!
# 4 = MESSAGE
# 2 = EVENT
# "chat" is event name, "hello" is payload
packet = '42["chat","hello"]'
conn.send(packet)
#from IPython import embed; embed()
@AXGKl

This comment has been minimized.

Copy link

@AXGKl AXGKl commented May 23, 2018

why didn't I find this epic gist 3 hours earlier. thanks so much!!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.