Skip to content

Instantly share code, notes, and snippets.

@kuk
Created May 22, 2013 19:08
Show Gist options
  • Save kuk/5630079 to your computer and use it in GitHub Desktop.
Save kuk/5630079 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import os
import fcntl
import errno
import sys
import time
import shlex
import signal
import collections
def command_is_running(pid):
with open('/proc/%d/stat' % pid) as stat:
stats = stat.read()
return ' R ' in stats
def wait_command_processes_line(pid):
# stats = ...
while command_is_running(pid):
# stats = update_stats(stats, pid)
return stats
class IsTestLine:
def __init__(self):
pass
def __call__(self, stats):
# return ...
is_test_line = IsTestLine()
def make_non_blocking_read_pipe():
read_descriptor, write_descriptor = os.pipe()
fcntl.fcntl(read_descriptor, fcntl.F_SETFL, os.O_NONBLOCK)
return (read_descriptor, write_descriptor)
def read_non_blocking_pipe(descriptor,
chunk_size=1000,
exit_on_try_again=True):
"""There are four cases that non blocking read can lead to:
* try again
* read some data
* eof
* error
The algorithm is:
* by default return on try again. So read should be called
after data is already in pipe.
* if ``exit_on_try_again`` is set to ``False``, loop until eof.
So this call should be done when data in pipe in not huge
in size, otherwise it will not fit in memmory.
* return on eof
* raise on error
"""
TRY_AGAIN = 1
EOF = 2
DATA = 3
def read_chunk(descriptor, chunk_size):
try:
chunk = os.read(descriptor, chunk_size)
if not chunk:
return (EOF, None)
return (DATA, chunk)
except OSError, e:
if e.errno == errno.EAGAIN:
return (TRY_AGAIN, None)
else:
raise
output = ''
while True:
retcode, chunk = read_chunk(descriptor, chunk_size)
if retcode == TRY_AGAIN:
if exit_on_try_again:
return output
else:
pass
elif retcode == EOF:
return output
elif retcode == DATA:
output += chunk
if __name__ == '__main__':
if len(sys.argv) != 3:
exit('usage: %s command [number of command output descriptors]\n'
' Script wraps command. Command has one input stream and\n'
' N>=1 output streams. Wrapped command has N+1 output streams.\n'
' If input line if likely to be suitable as test line \n'
' it is written onto stream N+1.\n')
command = sys.argv[1]
try:
descriptors = int(sys.argv[2])
if descriptors <= 0:
raise ValueError()
except ValueError:
exit('Number of output desctiptors should be positive integer, not %r' % (sys.argv[2],))
# Command will use N output descriptors + one for test lines output. 0, 1,
# 2 are already in use. Let's occupy 3, ... 3 + N
for _ in range(descriptors + 1):
os.dup(1)
input_pipe_read_descriptor, input_pipe_write_descriptor = os.pipe()
output_pipes = [make_non_blocking_read_pipe() for _ in range(descriptors)]
# Write ends of pipes is used by user command so it is not guarantied
# (actually it is allways false) that command will handle no blocking
# write properly. So create pipe with only non blocking read end.
# The descriptor layout looks this way (N=2, for example):
# 0 in
# 1 out
# 2 err
#
# 3 out
# 4 out
# 5 out
#
# 6 inpipe r
# 7 inpipe w
#
# 8 outpipe1 r
# 9 outpipe1 w
# 10 outpipe2 r
# 11 outpipe2 w
command_pid = os.fork()
if command_pid == 0:
os.dup2(input_pipe_read_descriptor, 0)
os.close(input_pipe_read_descriptor)
os.close(input_pipe_write_descriptor)
for index, (read_descriptor, write_descriptor) in enumerate(output_pipes):
os.dup2(write_descriptor, index + 3) # command will write to descriptors 3, 4, ...
os.close(write_descriptor)
os.close(read_descriptor)
os.dup2(3, 1) # redirect stdout to 3'rd descriptor as MR does
os.close(descriptors + 3) # N + 1 descriptor is not used in child
# Child (command) descriptors:
# 0 inpipe r
# 1 outpipe1 w
# 2 err
#
# 3 outpipe1 w
# 4 outpipe2 w
#
# So descriptors layout totally mimics layout that command
# expects, but actual files they point to differ. All input
# comes from parent and all output goes to parent via pipes.
parsed_command = shlex.split(command)
os.execvp(parsed_command[0], parsed_command)
else:
try:
os.close(input_pipe_read_descriptor)
for _, write_descriptor in output_pipes:
os.close(write_descriptor)
# Parent descriptors:
# 0 in
# 1 out
# 2 err
#
# 3 out
# 4 out
# 5 out
#
# 7 inpipe w
#
# 8 outpipe1 r
# 10 outpipe2 r
#
# So parent can write to inpipe and read from outpipes.
# If programm is launched that way:
# ./get_test_input 'cmd' 2 3>f1 4>f2 5>test_input
# descriptors 3, 4, 5 point to specific files not stdout.
for line in sys.stdin:
os.write(input_pipe_write_descriptor, line)
stats = wait_command_processes_line(command_pid)
for index, (read_descriptor, _) in enumerate(output_pipes):
output = read_non_blocking_pipe(read_descriptor)
if output:
os.write(index + 3, output)
if is_test_line(stats):
os.write(descriptors + 3, line)
os.close(input_pipe_write_descriptor)
for index, (read_descriptor, _) in enumerate(output_pipes):
output = read_non_blocking_pipe(read_descriptor,
exit_on_try_again=False)
if output:
os.write(index + 3, output)
os.wait()
except KeyboardInterrupt:
os.kill(command_pid, signal.SIGINT)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment