Skip to content

Instantly share code, notes, and snippets.

@reisraff
Last active June 12, 2019 18:39
Show Gist options
  • Save reisraff/421b20d665ec2c0d2c4697baa3f302bd to your computer and use it in GitHub Desktop.
Save reisraff/421b20d665ec2c0d2c4697baa3f302bd to your computer and use it in GitHub Desktop.
Simple pwntool::process
#!/usr/bin/env python
import subprocess
import select
import fcntl
import os
import pty
import tty
import sys
import time
class P:
def __init__(self, command, args=[], debug=True):
self.stdout = []
self.stderr = []
self.debug = debug
self.is_in_interactive = False
master, slave = pty.openpty()
tty.setraw(master)
tty.setraw(slave)
if self.debug:
sys.stdout.write('Starting process "{}"\n'.format(' '.join([command] + args)))
sys.stdout.flush()
self.process = subprocess.Popen(
[command] + args,
stdin=subprocess.PIPE,
stdout=slave,
stderr=subprocess.STDOUT,
bufsize=1,
close_fds=True
)
self.process.stdout = os.fdopen(os.dup(master), 'rb', 0)
os.close(master)
os.close(slave)
if self.process.stdout:
fd = self.process.stdout.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
def poll(self):
self.process.poll()
returncode = self.process.returncode
if returncode is not None:
self._stop_noticed = time.time()
sys.stdout.write("Process stopped with exit code {}\n".format(returncode))
sys.stdout.flush()
exit()
return returncode
def send_raw(self, data):
self.poll()
self.process.stdin.write(data)
self.process.stdin.flush()
if self.debug:
self.__print_debug('Sent: ', data)
def sendline(self, line):
self.send_raw(line + b'\n')
def can_read(self):
a, b, c = select.select([self.process.stdout], [], [], 0.05)
if not a:
return False
return True
def recvuntil(self, delimiter):
while not self.can_read():
time.sleep(0.05)
stop = False
buff = ''
while stop is False:
try:
if not self.is_in_interactive:
while not self.can_read():
time.sleep(0.05)
buff += self.process.stdout.read(1)
if delimiter is not None:
if buff[-1] == delimiter:
stop = True
except Exception:
stop = True
if self.debug:
self.__print_debug('Received: ', buff)
return buff
def recvline(self):
self.poll()
return self.recvuntil('\n')
def recv(self):
self.poll()
return self.recvuntil(None)
def set_debug(self, val):
self.debug = val
def interactive(self, debug=None):
self.is_in_interactive = True
self.poll()
self.recv()
if debug is not None:
self.debug = debug
i = input if sys.version_info[0] >= 3 else raw_input
while True:
self.poll()
data = i('$ ')
if data:
self.sendline(data)
sys.stdout.write(self.recv())
sys.stdout.flush()
def __print_debug(self, tag, data):
new_line = False if data[-1:] == '\n' else True
sys.stdout.write('[\033[31mDEBUG\033[0m] {} {} bytes\n'.format(tag, hex(len(data))))
sys.stdout.write(' \033[34mRAW\033[0m: {}{}'.format(data, ('\n' if new_line else '')))
sys.stdout.write(' \033[34mHEX\033[0m: ')
for c in data:
sys.stdout.write(('{:02x} '.format(ord(c) if type(c) is not int else c)).upper())
sys.stdout.write('\n\n')
sys.stdout.flush()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment