Skip to content

Instantly share code, notes, and snippets.

@frankli0324
Last active May 29, 2022 04:10
Show Gist options
  • Save frankli0324/795162a14be988a01e0efa0531f7ac5a to your computer and use it in GitHub Desktop.
Save frankli0324/795162a14be988a01e0efa0531f7ac5a to your computer and use it in GitHub Desktop.
websocket for pwntools
# pip3 install pwntools-tube-websocket
from pwn import *
from wstube import websocket
a = websocket('wss://echo.websocket.events')
print(a.recv())
for i in range(3):
a.send(b'test')
print(a.recv(2))
print(a.recv(2))
a.sendline(b'test')
print(a.recv())
a.send(b'12345asdfg')
print(a.recvregex(b'[0-9]{5}'))
print(a.recv())
a.close()
from websocket import WebSocket, ABNF, WebSocketException, WebSocketTimeoutException
from pwnlib.tubes.tube import tube
class websocket(tube):
def __init__(self, url, headers=None, *args, **kwargs):
if headers is None:
headers = {}
super(websocket, self).__init__(*args, **kwargs)
self.closed = False
self.sock = WebSocket()
self.url = url
self.sock.connect(url, header=headers)
def recv_raw(self, numb):
if self.closed:
raise EOFError
while True:
try:
data = self.sock.recv()
if isinstance(data, str):
data = data.encode()
break
except WebSocketTimeoutException:
return None
except WebSocketException:
self.shutdown("recv")
raise EOFError
if not data:
self.shutdown()
raise EOFError('Recv Error')
return data
def send_raw(self, data):
if self.closed:
raise EOFError
try:
self.sock.send_binary(data)
except WebSocketException as e:
self.shutdown()
raise EOFError('Send Error')
def settimeout_raw(self, timeout):
if getattr(self, 'sock', None):
self.sock.settimeout(timeout)
def connected_raw(self, direction):
try:
self.sock.ping()
opcode, data = self.sock.recv_data(True)
return opcode == ABNF.OPCODE_PONG
except:
return False
def close(self):
if not getattr(self, 'sock', None):
return
self.closed = True
self.sock.close()
self.sock = None
self._close_msg()
def _close_msg(self):
self.info('Closed connection to %s', self.url)
def shutdown_raw(self, direction):
if self.closed:
return
self.closed = True
self.sock.shutdown()
@kjcolley7
Copy link

Will you publish the wstube module? This looks really useful for CTFs!

@frankli0324
Copy link
Author

Will you publish the wstube module? This looks really useful for CTFs!

Oh I'm sorry I thought I did

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment