-
-
Save kuk/5630079 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import os | |
import fcntl | |
import errno | |
import sys | |
import time | |
import shlex | |
import signal | |
import collections | |
def command_is_running(pid): | |
with open('/proc/%d/stat' % pid) as stat: | |
stats = stat.read() | |
return ' R ' in stats | |
def wait_command_processes_line(pid): | |
# stats = ... | |
while command_is_running(pid): | |
# stats = update_stats(stats, pid) | |
return stats | |
class IsTestLine: | |
def __init__(self): | |
pass | |
def __call__(self, stats): | |
# return ... | |
is_test_line = IsTestLine() | |
def make_non_blocking_read_pipe(): | |
read_descriptor, write_descriptor = os.pipe() | |
fcntl.fcntl(read_descriptor, fcntl.F_SETFL, os.O_NONBLOCK) | |
return (read_descriptor, write_descriptor) | |
def read_non_blocking_pipe(descriptor, | |
chunk_size=1000, | |
exit_on_try_again=True): | |
"""There are four cases that non blocking read can lead to: | |
* try again | |
* read some data | |
* eof | |
* error | |
The algorithm is: | |
* by default return on try again. So read should be called | |
after data is already in pipe. | |
* if ``exit_on_try_again`` is set to ``False``, loop until eof. | |
So this call should be done when data in pipe in not huge | |
in size, otherwise it will not fit in memmory. | |
* return on eof | |
* raise on error | |
""" | |
TRY_AGAIN = 1 | |
EOF = 2 | |
DATA = 3 | |
def read_chunk(descriptor, chunk_size): | |
try: | |
chunk = os.read(descriptor, chunk_size) | |
if not chunk: | |
return (EOF, None) | |
return (DATA, chunk) | |
except OSError, e: | |
if e.errno == errno.EAGAIN: | |
return (TRY_AGAIN, None) | |
else: | |
raise | |
output = '' | |
while True: | |
retcode, chunk = read_chunk(descriptor, chunk_size) | |
if retcode == TRY_AGAIN: | |
if exit_on_try_again: | |
return output | |
else: | |
pass | |
elif retcode == EOF: | |
return output | |
elif retcode == DATA: | |
output += chunk | |
if __name__ == '__main__': | |
if len(sys.argv) != 3: | |
exit('usage: %s command [number of command output descriptors]\n' | |
' Script wraps command. Command has one input stream and\n' | |
' N>=1 output streams. Wrapped command has N+1 output streams.\n' | |
' If input line if likely to be suitable as test line \n' | |
' it is written onto stream N+1.\n') | |
command = sys.argv[1] | |
try: | |
descriptors = int(sys.argv[2]) | |
if descriptors <= 0: | |
raise ValueError() | |
except ValueError: | |
exit('Number of output desctiptors should be positive integer, not %r' % (sys.argv[2],)) | |
# Command will use N output descriptors + one for test lines output. 0, 1, | |
# 2 are already in use. Let's occupy 3, ... 3 + N | |
for _ in range(descriptors + 1): | |
os.dup(1) | |
input_pipe_read_descriptor, input_pipe_write_descriptor = os.pipe() | |
output_pipes = [make_non_blocking_read_pipe() for _ in range(descriptors)] | |
# Write ends of pipes is used by user command so it is not guarantied | |
# (actually it is allways false) that command will handle no blocking | |
# write properly. So create pipe with only non blocking read end. | |
# The descriptor layout looks this way (N=2, for example): | |
# 0 in | |
# 1 out | |
# 2 err | |
# | |
# 3 out | |
# 4 out | |
# 5 out | |
# | |
# 6 inpipe r | |
# 7 inpipe w | |
# | |
# 8 outpipe1 r | |
# 9 outpipe1 w | |
# 10 outpipe2 r | |
# 11 outpipe2 w | |
command_pid = os.fork() | |
if command_pid == 0: | |
os.dup2(input_pipe_read_descriptor, 0) | |
os.close(input_pipe_read_descriptor) | |
os.close(input_pipe_write_descriptor) | |
for index, (read_descriptor, write_descriptor) in enumerate(output_pipes): | |
os.dup2(write_descriptor, index + 3) # command will write to descriptors 3, 4, ... | |
os.close(write_descriptor) | |
os.close(read_descriptor) | |
os.dup2(3, 1) # redirect stdout to 3'rd descriptor as MR does | |
os.close(descriptors + 3) # N + 1 descriptor is not used in child | |
# Child (command) descriptors: | |
# 0 inpipe r | |
# 1 outpipe1 w | |
# 2 err | |
# | |
# 3 outpipe1 w | |
# 4 outpipe2 w | |
# | |
# So descriptors layout totally mimics layout that command | |
# expects, but actual files they point to differ. All input | |
# comes from parent and all output goes to parent via pipes. | |
parsed_command = shlex.split(command) | |
os.execvp(parsed_command[0], parsed_command) | |
else: | |
try: | |
os.close(input_pipe_read_descriptor) | |
for _, write_descriptor in output_pipes: | |
os.close(write_descriptor) | |
# Parent descriptors: | |
# 0 in | |
# 1 out | |
# 2 err | |
# | |
# 3 out | |
# 4 out | |
# 5 out | |
# | |
# 7 inpipe w | |
# | |
# 8 outpipe1 r | |
# 10 outpipe2 r | |
# | |
# So parent can write to inpipe and read from outpipes. | |
# If programm is launched that way: | |
# ./get_test_input 'cmd' 2 3>f1 4>f2 5>test_input | |
# descriptors 3, 4, 5 point to specific files not stdout. | |
for line in sys.stdin: | |
os.write(input_pipe_write_descriptor, line) | |
stats = wait_command_processes_line(command_pid) | |
for index, (read_descriptor, _) in enumerate(output_pipes): | |
output = read_non_blocking_pipe(read_descriptor) | |
if output: | |
os.write(index + 3, output) | |
if is_test_line(stats): | |
os.write(descriptors + 3, line) | |
os.close(input_pipe_write_descriptor) | |
for index, (read_descriptor, _) in enumerate(output_pipes): | |
output = read_non_blocking_pipe(read_descriptor, | |
exit_on_try_again=False) | |
if output: | |
os.write(index + 3, output) | |
os.wait() | |
except KeyboardInterrupt: | |
os.kill(command_pid, signal.SIGINT) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment