Skip to content

Instantly share code, notes, and snippets.

@sergray
Created January 17, 2011 17:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sergray/783164 to your computer and use it in GitHub Desktop.
Save sergray/783164 to your computer and use it in GitHub Desktop.
Extend arbitrary interactive unix command with readline features
import sys
import subprocess
import readline
import select
import os
class Popen(subprocess.Popen):
def _communicate(self, input):
read_set = []
write_set = []
stdout = None # Return
stderr = None # Return
if self.stdin:
# Flush stdio buffer. This might block, if the user has
# been writing to .stdin in an uncontrolled fashion.
self.stdin.flush()
if input:
write_set.append(self.stdin)
else:
self.stdin.close()
if self.stdout:
read_set.append(self.stdout)
stdout = []
if self.stderr:
read_set.append(self.stderr)
stderr = []
input_offset = 0
while read_set or write_set:
try:
rlist, wlist, xlist = select.select(read_set, write_set, [], 0.1)
except select.error, e:
if e.args[0] == errno.EINTR:
continue
raise
if not (rlist or wlist or xlist):
break
if self.stdin in wlist:
# When select has indicated that the file is writable,
# we can write up to PIPE_BUF bytes without risk
# blocking. POSIX defines PIPE_BUF >= 512
chunk = input[input_offset : input_offset + 512]
bytes_written = os.write(self.stdin.fileno(), chunk)
input_offset += bytes_written
if input_offset >= len(input):
write_set.remove(self.stdin)
if self.stdout in rlist:
data = os.read(self.stdout.fileno(), 1024)
if data == "":
read_set.remove(self.stdout)
stdout.append(data)
if self.stderr in rlist:
data = os.read(self.stderr.fileno(), 1024)
if data == "":
read_set.remove(self.stderr)
stderr.append(data)
# All data exchanged. Translate lists into strings.
if stdout is not None:
stdout = ''.join(stdout)
if stderr is not None:
stderr = ''.join(stderr)
# Translate newlines, if requested. We cannot let the file
# object do the translation: It is based on stdio, which is
# impossible to combine with select (unless forcing no
# buffering).
if self.universal_newlines and hasattr(file, 'newlines'):
if stdout:
stdout = self._translate_newlines(stdout)
if stderr:
stderr = self._translate_newlines(stderr)
return (stdout, stderr)
prompt = '> '
try:
proc = Popen(sys.argv[1:],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
except ValueError, e:
print "Please check your arguments"
sys.exit()
else:
hist_fname = ".%s" % sys.argv[1]
try:
readline.read_history_file(hist_fname)
except:
pass
out, err = proc.communicate("\n")
while True:
if out: print out.lstrip(prompt)
if err: print err
proc.poll()
if proc.returncode is not None:
break
try:
cmd = raw_input(prompt)
except (KeyboardInterrupt, EOFError):
proc.terminate()
break
else:
out, err = proc.communicate(cmd+"\n")
readline.write_history_file(hist_fname)
sys.exit(proc.returncode)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment