Created
May 8, 2019 16:58
-
-
Save daryl314/414ced790bddf8f93e682cd94732398e to your computer and use it in GitHub Desktop.
Execute in a background pseudo-terminal
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pty, sys, os, re, tty, select | |
class ReadBuffer(object): | |
"""Buffer read operations""" | |
def __init__(self, fd): | |
if isinstance(fd, file): | |
fd = fd.fileno() | |
self.fd = fd | |
self.escape = '' | |
def read(self): | |
c = os.read(self.fd, 1) | |
if self.escape == '' and c != '\x1b': | |
return c | |
else: | |
self.escape += c | |
if self.completeEscape(self.escape): | |
out = self.escape | |
self.escape = '' | |
return out | |
@staticmethod | |
def completeEscape(seq): | |
"""True if sequence is a complete escape sequence""" | |
assert seq[0] == '\x1b' | |
if len(seq) == 1: | |
return False | |
elif len(seq) == 2 and seq != '\x1b[': | |
return True | |
else: | |
return re.match(r'[A-Za-z]', seq[-1]) | |
class Logger(object): | |
"""Log data flow""" | |
def __init__(self, width=3, logger=sys.stderr): | |
self.fmt = '\n[%0{}d %s] '.format(width) | |
self.last_tag = None | |
self.counter = 0 | |
self.logger = logger | |
def log(self, tag, txt): | |
# start a new line if tag changed or if text is an escape sequence | |
if tag != self.last_tag or txt[0] == '\x1b': | |
self.logger.write(self.fmt % (self.counter, tag)) | |
self.counter += 1 | |
# log text to current line | |
self.logger.write(txt.__repr__()[1:-1]) | |
# start a new line if text ends with a newline; otherwise save tag to continue line | |
if txt.endswith('\n') or txt.endswith('\n\r'): | |
self.last_tag = None | |
else: | |
self.last_tag = tag | |
class PtyHandler(object): | |
"""pseudo-terminal interactivity""" | |
@classmethod | |
def background(cls, callback, verbose=False): | |
"""Execute a callback function in a background pseudo-terminal""" | |
(child_pid, pty_fd) = pty.fork() | |
if child_pid == 0: # in child process with stdin/stdout/stderr mapped to pty | |
tty.setraw(sys.stdin) # https://en.wikipedia.org/wiki/Terminal_mode | |
callback() | |
else: # in parent process | |
(master, slave) = pty.openpty() | |
tty.setraw(slave) # https://en.wikipedia.org/wiki/Terminal_mode | |
print("Connect to pty: `screen {}`".format(os.ttyname(slave))) | |
cls.intercept( | |
master, # file descriptor for user-facing pseudo-terminal | |
pty_fd, # file descriptor for background process | |
sLF=True, # add line feed to slave newlines to make GNU screen happy | |
sFilter={'\x1b[6n'}, # filter out slave "Query Cursor Position" requests | |
verbose=verbose, # display debugging information? | |
) | |
@classmethod | |
def intercept(cls, master_fd, slave_fd, verbose=False, mLF=False, sLF=False, mFilter=[], sFilter=[]): | |
"""Intercept traffic between two TTY's""" | |
mBuf = ReadBuffer(master_fd) | |
sBuf = ReadBuffer(slave_fd) | |
logger = Logger() if verbose else None | |
while True: | |
rs, ws, es = select.select([master_fd, slave_fd], [], []) | |
for r in rs: | |
if r is slave_fd: | |
cls.route(sBuf, master_fd, logger, 'S', addLF=sLF, filtered=sFilter) | |
elif r is master_fd: | |
cls.route(mBuf, slave_fd, logger, 'M', addLF=mLF, filtered=mFilter) | |
@classmethod | |
def route(cls, buf, out_fd, logger, tag, addLF=False, filtered=[]): | |
"""Read data from a ReadBuffer and route to an output file descriptor""" | |
c = buf.read() | |
if c is not None: | |
if addLF: | |
c = c.replace('\n', '\n\r') | |
if logger is not None: | |
logger.log(tag, c) | |
if c not in filtered: | |
os.write(out_fd, c) | |
if __name__ == '__main__': | |
def fn(): | |
x = 42 # put in scope of IPython | |
__import__('IPython').embed() | |
PtyHandler.background(fn, verbose=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment