Created
June 8, 2010 15:10
-
-
Save leeor/430160 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from System.Diagnostics import Process, ProcessStartInfo | |
from System import Environment | |
import shlex | |
import os | |
PIPE = -1 | |
STDOUT = -2 | |
class CalledProcessError(Exception): | |
""" | |
This exception is raised when a process run by check_call() returns | |
a non-zero exit status. The exit status will be stored in the | |
returncode attribute. | |
""" | |
def __init__(self, returncode, cmd): | |
self.returncode = returncode | |
self.cmd = cmd | |
def __str__(self): | |
return "Command '%s' returned non-zero exit status %d" % (self.cmd, self.returncode) | |
class Buffer(object): | |
def __init__(self, universal_newlines): | |
self.data = '' | |
self.universal_newlines = universal_newlines | |
def _translate_newlines(self, data): | |
data = data.replace("\r\n", "\n") | |
data = data.replace("\r", "\n") | |
return data | |
def append(self, data): | |
if data is None: | |
return | |
# The way the stream redirection works in .NET is that it calls the | |
# event handler with each line. To be on the safe side, I add a line | |
# termination here (standard Windows '\r\n'), and then convert according | |
# to universal_newlines if needed. | |
# It's probably not really needed though, and I should be able to get | |
# away with just adding line termination sequence according to | |
# universal_newlines instead, and not call the translation function. | |
data += '\r\n' | |
if self.universal_newlines: | |
data = self._translate_newlines(data) | |
self.data += data | |
class Popen(object): | |
def __init__(self, args, bufsize=0, executable=None, stdin=None, | |
stdout=None, stderr=None, preexec_fn=None, close_fds=False, | |
shell=False, cwd=None, env=None, universal_newlines=False, | |
startupinfo=None, creationflags=0): | |
if preexec_fn is not None: | |
raise ValueError("preexec_fn is not supported on Windows platforms") | |
if close_fds and (stdin is not None or stdout is not None or | |
stderr is not None): | |
raise ValueError("close_fds is not supported on Windows " | |
"platforms if you redirect stdin/stdout/stderr") | |
# also, when using the .NET api, handle inheritance is implied by the ProcessStartInfo.UseShellExecute | |
# It seems that handles are never inherited when the shell is used, and are always inherited otherwise. | |
if startupinfo is not None: | |
raise ValueError("The startupinfo argument is not supported") | |
if creationflags != 0: | |
raise ValueError("The creationflags argument is not supported") | |
self.pid = None | |
self.returncode = None | |
# streams | |
self.stdin = stdin | |
self.stdout_buffer = None | |
if stdout == PIPE: | |
self.stdout_buffer = Buffer(universal_newlines) | |
self.stderr_buffer = None | |
if stderr == PIPE: | |
self.stderr_buffer = Buffer(universal_newlines) | |
elif stderr == STDOUT: | |
self.stderr_buffer = self.stdout_buffer | |
if type(args).__name__ == 'str': | |
# this should be good enough to start with, but issues are expected if the string contains backslashes | |
# ideally, we need something that fulfills http://msdn.microsoft.com/en-us/library/ms880421 | |
args = shlex.split(args) | |
if executable is not None: | |
args.insert(0, executable) | |
if shell: | |
args.insert(0, '/c') | |
args.insert(0, os.environ['COMSPEC']) | |
self._execute_child(args, cwd, env) | |
def _execute_child(self, args, cwd, env): | |
self.process_start_info = ProcessStartInfo(args[0], ' '.join(args[1:])) | |
self.process_start_info.UseShellExecute = False | |
self.process_start_info.RedirectStandardInput = self.stdin is not None | |
self.process_start_info.RedirectStandardOutput = self.stdout_buffer is not None | |
self.process_start_info.RedirectStandardError = self.stderr_buffer is not None | |
self.process_start_info.CreateNoWindow = True | |
self.process_start_info.UserName = None | |
self.process_start_info.Password = None | |
if cwd is not None: | |
self.process_start_info.WorkingDirectory = cwd | |
if env is not None: | |
sysroot = self.process_start_info.EnvironmentVariables['SYSTEMROOT'] | |
self.process_start_info.EnvironmentVariables.Clear() | |
for key, value in env.items(): | |
self.process_start_info.EnvironmentVariables.Add(key, value) | |
# maintain the SYSTEMROOT | |
self.process_start_info.EnvironmentVariables['SYSTEMROOT'] = sysroot | |
self.process_to_run = Process() | |
self.process_to_run.StartInfo = self.process_start_info | |
if self.stdout_buffer is not None: | |
self.process_to_run.OutputDataReceived += lambda process, data: self.stdout_buffer.append(data.Data) | |
if self.stderr_buffer is not None: | |
self.process_to_run.ErrorDataReceived += lambda process, data: self.stderr_buffer.append(data.Data) | |
self.process_to_run.Start(); | |
self.pid = self.process_to_run.Id | |
if self.stdout_buffer is not None: | |
self.process_to_run.BeginOutputReadLine(); | |
if self.stderr_buffer is not None: | |
self.process_to_run.BeginErrorReadLine(); | |
def poll(self): | |
if self.returncode is None: | |
if self.process_to_run.WaitForExit(0) == True: | |
self.returncode = self.process_to_run.ExitCode | |
return self.returncode | |
def wait(self): | |
self.process_to_run.WaitForExit(); | |
self.returncode = self.process_to_run.ExitCode | |
return self.returncode | |
def communicate(self, input=None): | |
if None not in (input, self.stdin): | |
self.process_to_run.StandardInput.Write(input) | |
self.wait() | |
return (self.stdout, self.stderr) | |
def send_signal(self, signal): | |
if signal != 15: | |
raise ValueError("Only SIGTERM is supported on Windows platforms") | |
self.terminate() | |
def terminate(self): | |
try: | |
self.process_to_run.Kill() | |
except Exception, ex: | |
raise | |
kill = terminate | |
def get_stdout(self): | |
if self.stdout_buffer is None: | |
return None | |
return self.stdout_buffer.data | |
def get_stderr(self): | |
if self.stderr_buffer is None or self.stderr_buffer is self.stdout_buffer: | |
return None | |
return self.stderr_buffer.data | |
stdout = property(get_stdout) | |
stderr = property(get_stderr) | |
def call(*popenargs, **kwargs): | |
""" | |
From: http://docs.python.org/library/subprocess.html | |
Run command with arguments. Wait for command to complete, then return the | |
returncode attribute. The arguments are the same as for the Popen | |
constructor. Example: | |
>>> retcode = subprocess.call(["ls", "-l"]) | |
""" | |
return Popen(*popenargs, **kwargs).wait() | |
def check_call(*popenargs, **kwargs): | |
""" | |
From: http://docs.python.org/library/subprocess.html | |
Run command with arguments. Wait for command to complete. If the exit code | |
was zero then return, otherwise raise CalledProcessError. | |
The CalledProcessError object will have the return code in the returncode | |
attribute. | |
The arguments are the same as for the Popen constructor. Example: | |
>>> subprocess.check_call(["ls", "-l"]) | |
0 | |
""" | |
returncode = call(*popenargs, **kwargs) | |
cmd = kwargs.get('args') | |
if cmd is None: | |
cmd = popenargs[0] | |
if returncode != 0: | |
raise CalledProcessError(returncode, cmd) | |
return returncode |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment