Skip to content

Instantly share code, notes, and snippets.

@jathanism
Created January 8, 2013 23:58
Show Gist options
  • Save jathanism/4489235 to your computer and use it in GitHub Desktop.
Save jathanism/4489235 to your computer and use it in GitHub Desktop.
Python script which becomes a pseudo terminal so that it can act as a man in the middle between your terminal emulator and a running process. Source: http://sqizit.bartletts.id.au/2011/02/14/pseudo-terminals-in-python/
# Copyright (c) 2011 Joshua D. Bartlett
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import array
import fcntl
import os
import pty
import select
import signal
import sys
import termios
import tty
# The following escape codes are xterm codes.
# See http://rtfm.etla.org/xterm/ctlseq.html for more.
START_ALTERNATE_MODE = set('\x1b[?{0}h'.format(i) for i in
… ('1049', '47', '1047'))
END_ALTERNATE_MODE = set('\x1b[?{0}l'.format(i) for i in
… ('1049', '47', '1047'))
ALTERNATE_MODE_FLAGS = tuple(START_ALTERNATE_MODE) +
… tuple(END_ALTERNATE_MODE)
def findlast(s, substrs):
'''
Finds whichever of the given substrings occurs last in the given string
… and returns that substring, or returns None if no such strings
… occur.
'''
i = -1
result = None
for substr in substrs:
pos = s.rfind(substr)
if pos > i:
i = pos
result = substr
return result
class Interceptor(object):
'''
This class does the actual work of the pseudo terminal. The spawn()
… function is the main entrypoint.
'''
def __init__(self):
self.master_fd = None
def spawn(self, argv=None):
'''
Create a spawned process.
Based on the code for pty.spawn().
'''
assert self.master_fd is None
if not argv:
argv = [os.environ['SHELL']]
pid, master_fd = pty.fork()
self.master_fd = master_fd
if pid == pty.CHILD:
os.execlp(argv[0], *argv)
old_handler = signal.signal(signal.SIGWINCH, self._signal_winch)
try:
mode = tty.tcgetattr(pty.STDIN_FILENO)
tty.setraw(pty.STDIN_FILENO)
restore = 1
except tty.error: # This is the same as termios.error
restore = 0
self._init_fd()
try:
self._copy()
except (IOError, OSError):
if restore:
tty.tcsetattr(pty.STDIN_FILENO, tty.TCSAFLUSH, mode)
os.close(master_fd)
self.master_fd = None
signal.signal(signal.SIGWINCH, old_handler)
def _init_fd(self):
'''
Called once when the pty is first set up.
'''
self._set_pty_size()
def _signal_winch(self, signum, frame):
'''
Signal handler for SIGWINCH - window size has changed.
'''
self._set_pty_size()
def _set_pty_size(self):
'''
Sets the window size of the child pty based on the window size of
… our own controlling terminal.
'''
assert self.master_fd is not None
# Get the terminal size of the real terminal, set it on the
… pseudoterminal.
buf = array.array('h', [0, 0, 0, 0])
fcntl.ioctl(pty.STDOUT_FILENO, termios.TIOCGWINSZ, buf, True)
fcntl.ioctl(self.master_fd, termios.TIOCSWINSZ, buf)
def _copy(self):
'''
Main select loop. Passes all data to self.master_read() or
… self.stdin_read().
'''
assert self.master_fd is not None
master_fd = self.master_fd
while 1:
try:
rfds, wfds, xfds = select.select([master_fd,
… pty.STDIN_FILENO], [], [])
except select.error, e:
if e[0] == 4: # Interrupted system call.
continue
if master_fd in rfds:
data = os.read(self.master_fd, 1024)
self.master_read(data)
if pty.STDIN_FILENO in rfds:
data = os.read(pty.STDIN_FILENO, 1024)
self.stdin_read(data)
def write_stdout(self, data):
'''
Writes to stdout as if the child process had written the data.
'''
os.write(pty.STDOUT_FILENO, data)
def write_master(self, data):
'''
Writes to the child process from its controlling terminal.
'''
master_fd = self.master_fd
assert master_fd is not None
while data != '':
n = os.write(master_fd, data)
data = data[n:]
def master_read(self, data):
'''
Called when there is data to be sent from the child process back to
… the user.
'''
flag = findlast(data, ALTERNATE_MODE_FLAGS)
if flag is not None:
if flag in START_ALTERNATE_MODE:
# This code is executed when the child process switches the
… terminal into alternate mode. The line below
… assumes that the user has opened vim, and writes a
… message.
self.write_master('IEntering special mode.\x1b')
elif flag in END_ALTERNATE_MODE:
# This code is executed when the child process switches the
… terminal back out of alternate mode. The line below
… assumes that the user has returned to the command
… prompt.
self.write_master('echo "Leaving special mode."\r')
self.write_stdout(data)
def stdin_read(self, data):
'''
Called when there is data to be sent from the user/controlling
… terminal down to the child process.
'''
self.write_master(data)
if __name__ == '__main__':
i = Interceptor()
i.write_stdout('\nThe dream has begun.\n')
i.spawn(sys.argv[1:])
i.write_stdout('\nThe dream is (probably) over.\n')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment