Skip to content

Instantly share code, notes, and snippets.

@dlovell
Created October 6, 2018 21:00
Show Gist options
  • Save dlovell/489cee7f884bcf1637ca85220c34a7de to your computer and use it in GitHub Desktop.
Save dlovell/489cee7f884bcf1637ca85220c34a7de to your computer and use it in GitHub Desktop.
import io
import os
import tempfile
import functools
import contextlib
import subprocess
import concurrent.futures
handle_stdout = functools.partial(print, '1:',)
handle_stderr = functools.partial(print, '2:',)
class Tee:
"""
"""
def __init__(self, handle_line=print):
(read_fd, write_fd) = os.pipe()
self._read_fh = os.fdopen(read_fd)
self._write_fh = os.fdopen(write_fd, 'wt')
self._handle_line = handle_line
self._sio = io.StringIO()
self._executor = concurrent.futures.ThreadPoolExecutor(2)
self._fut = self._executor.submit(self._f)
def _f(self):
line_sio = io.StringIO()
for c in iter(lambda: self._read_fh.read(1), ''):
if c == '':
v = line_sio.getvalue()
if v:
self._handle_line(v)
self.close()
return
else:
self._sio.write(c)
if c == '\n':
self._handle_line(line_sio.getvalue())
line_sio.truncate(0)
else:
line_sio.write(c)
def fileno(self):
return self._write_fh.fileno()
def getvalue(self):
return self._sio.getvalue()
def close(self):
# order matters!
self._write_fh.close()
self._read_fh.close()
@property
def closed(self):
return self._read_fh.closed and self._write_fh.closed
def _wait_on_proc(self, proc=None):
proc.wait() if proc else None
self.close()
def wait_on_proc(self, proc=None):
self._proc_fut = self._executor.submit(self._wait_on_proc, proc)
def tee_popen(command,
handle_stdout=handle_stdout, handle_stderr=handle_stderr):
tee_out = Tee(handle_stdout)
tee_err = Tee(handle_stderr)
proc = subprocess.Popen(command, stdout=tee_out, stderr=tee_err)
tee_out.wait_on_proc(proc)
tee_err.wait_on_proc(proc)
return (proc, tee_out, tee_err)
@contextlib.contextmanager
def commands_as_script(commands):
with tempfile.NamedTemporaryFile(mode='wt', delete=False) as fh:
fh.write('\n'.join(commands) + '\n')
fh.flush()
# file must be closed and executable
os.chmod(fh.name, 0o505)
fh.close()
# this is the caller's "command" to pass to subprocess.Popen
yield [fh.name]
# caller must not exit context till process is done
os.unlink(fh.name)
TEST_COMMAND = """\
#!/bin/sh
sleep 1
echo "$(date): 1"
echo "$(date): 1" >&2
sleep 1
echo "$(date): 2"
sleep 1
echo "$(date): 3"
sleep 1
echo "$(date): 4"\
"""
if __name__ == '__main__':
script_lines = TEST_COMMAND.split('\n')
with commands_as_script(script_lines) as command:
(proc, tee_out, tee_err) = tee_popen(command)
# block till proc done, else script may disappear before its done
proc.wait()
print()
print('tee_out:\n\t' + tee_out.getvalue().replace('\n', '\n\t'))
print('tee_err:\n\t' + tee_err.getvalue().replace('\n', '\n\t'))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment