Skip to content

Instantly share code, notes, and snippets.

@daryl314
Created May 8, 2019 16:58
Show Gist options
  • Save daryl314/414ced790bddf8f93e682cd94732398e to your computer and use it in GitHub Desktop.
Save daryl314/414ced790bddf8f93e682cd94732398e to your computer and use it in GitHub Desktop.
Execute in a background pseudo-terminal
import pty, sys, os, re, tty, select
class ReadBuffer(object):
"""Buffer read operations"""
def __init__(self, fd):
if isinstance(fd, file):
fd = fd.fileno()
self.fd = fd
self.escape = ''
def read(self):
c = os.read(self.fd, 1)
if self.escape == '' and c != '\x1b':
return c
else:
self.escape += c
if self.completeEscape(self.escape):
out = self.escape
self.escape = ''
return out
@staticmethod
def completeEscape(seq):
"""True if sequence is a complete escape sequence"""
assert seq[0] == '\x1b'
if len(seq) == 1:
return False
elif len(seq) == 2 and seq != '\x1b[':
return True
else:
return re.match(r'[A-Za-z]', seq[-1])
class Logger(object):
"""Log data flow"""
def __init__(self, width=3, logger=sys.stderr):
self.fmt = '\n[%0{}d %s] '.format(width)
self.last_tag = None
self.counter = 0
self.logger = logger
def log(self, tag, txt):
# start a new line if tag changed or if text is an escape sequence
if tag != self.last_tag or txt[0] == '\x1b':
self.logger.write(self.fmt % (self.counter, tag))
self.counter += 1
# log text to current line
self.logger.write(txt.__repr__()[1:-1])
# start a new line if text ends with a newline; otherwise save tag to continue line
if txt.endswith('\n') or txt.endswith('\n\r'):
self.last_tag = None
else:
self.last_tag = tag
class PtyHandler(object):
"""pseudo-terminal interactivity"""
@classmethod
def background(cls, callback, verbose=False):
"""Execute a callback function in a background pseudo-terminal"""
(child_pid, pty_fd) = pty.fork()
if child_pid == 0: # in child process with stdin/stdout/stderr mapped to pty
tty.setraw(sys.stdin) # https://en.wikipedia.org/wiki/Terminal_mode
callback()
else: # in parent process
(master, slave) = pty.openpty()
tty.setraw(slave) # https://en.wikipedia.org/wiki/Terminal_mode
print("Connect to pty: `screen {}`".format(os.ttyname(slave)))
cls.intercept(
master, # file descriptor for user-facing pseudo-terminal
pty_fd, # file descriptor for background process
sLF=True, # add line feed to slave newlines to make GNU screen happy
sFilter={'\x1b[6n'}, # filter out slave "Query Cursor Position" requests
verbose=verbose, # display debugging information?
)
@classmethod
def intercept(cls, master_fd, slave_fd, verbose=False, mLF=False, sLF=False, mFilter=[], sFilter=[]):
"""Intercept traffic between two TTY's"""
mBuf = ReadBuffer(master_fd)
sBuf = ReadBuffer(slave_fd)
logger = Logger() if verbose else None
while True:
rs, ws, es = select.select([master_fd, slave_fd], [], [])
for r in rs:
if r is slave_fd:
cls.route(sBuf, master_fd, logger, 'S', addLF=sLF, filtered=sFilter)
elif r is master_fd:
cls.route(mBuf, slave_fd, logger, 'M', addLF=mLF, filtered=mFilter)
@classmethod
def route(cls, buf, out_fd, logger, tag, addLF=False, filtered=[]):
"""Read data from a ReadBuffer and route to an output file descriptor"""
c = buf.read()
if c is not None:
if addLF:
c = c.replace('\n', '\n\r')
if logger is not None:
logger.log(tag, c)
if c not in filtered:
os.write(out_fd, c)
if __name__ == '__main__':
def fn():
x = 42 # put in scope of IPython
__import__('IPython').embed()
PtyHandler.background(fn, verbose=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment