Skip to content

Instantly share code, notes, and snippets.

@bingli224
Created April 30, 2019 16:58
Show Gist options
  • Save bingli224/8c792f92058c2728b2d3b128ce3cf7be to your computer and use it in GitHub Desktop.
Save bingli224/8c792f92058c2728b2d3b128ce3cf7be to your computer and use it in GitHub Desktop.
Simple middleware tool to test websocket connection
## By BingLi224
## 00:57 THA 30/04/2019
##
## Simple middleware tool to test websocket connection.
##
## Use http://demos.kaazing.com/echo/ as a free websocket testing server/client.
## 1) run this script
## 2) open http://demos.kaazing.com/echo/
## 3) set "Location" to "ws://localhost:8888/echo"
## 4) monitor the connection in console
import select
import socket
import re
def middle ( ) :
## echo server
url = "demos.kaazing.com"
## my server
my_server = ( "localhost", 8888 )
print ( "Listening [" + my_server [ 0 ] + ":" + str ( my_server [ 1 ] ) + "]" )
## listen for browser
serv = socket.socket ( )
serv.bind ( my_server )
serv.listen ( 1 )
conn, addr = serv.accept ( )
data = conn.recv ( 2000 ).decode ( 'utf-8' )
host = re.findall ( r'\nHost:\s(.+?)\r?\n', data, re.MULTILINE | re.DOTALL )
origin = re.findall ( r'\nOrigin:\s(.+?)\r?\n', data, re.MULTILINE | re.DOTALL )
print ( "=== FROM STARTER ===" )
print ( data )
print ( host )
print ( origin )
## replace the target
data = re.sub ( host [ 0 ], url, data, 1 ) ## will send to echo server
data = re.sub ( origin [ 0 ], my_server [ 0 ] + ":" + str ( my_server [ 1 ] ), data, 1 ) ## from my server
print ( "=== EDITED DATA ===" )
print ( data )
## conn to kaazing
print ( "=== CONNECT TO ECHO ===" )
echo = socket.socket ( )
echo.connect ( ( "demos.kaazing.com", 80 ) )
## forward to kaazing
print ( "=== FORWARD TO ECHO ===" )
echo.sendall ( data.encode ( 'utf-8' ) )
conn.setblocking ( 0 ) ## non-block
echo.setblocking ( 0 ) ## non-block
readables = [ conn, echo ]
print ( "=== WAIT FOR ANY INCOMES === " )
while ( 1 ) :
readable, writable, exceptions = select.select ( readables, [ ], readables, 60.0 )
if not ( readable or writable or exceptions ) :
break ## timeout
try :
for r in readable :
if r is conn :
data = conn.recv ( 2000 )
print ( " === FROM STARTER [%d] ===" % len(data) )
print ( data )
echo.sendall ( data )
if not data :
raise Exception
elif r is echo :
data = echo.recv ( 2000 )
print ( " === FROM ECHO [%d] ===" % len(data) )
print ( data )
conn.sendall ( data )
if not data :
raise Exception
else :
data = r.recv ( 2000 )
print ( " === FROM UNKNOWN === " )
print ( data )
if len ( data ) == 0 :
raise Exception
except Exception as e :
print ( e )
break
## end
echo.close ( )
conn.close ( )
middle ( )
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment