Skip to content

Instantly share code, notes, and snippets.

@fralau
Last active July 7, 2021 13:30
Show Gist options
  • Save fralau/d60ccab4ba07091b6ece5d1f5ca706e4 to your computer and use it in GitHub Desktop.
Save fralau/d60ccab4ba07091b6ece5d1f5ca706e4 to your computer and use it in GitHub Desktop.
Python: A simple and general solution for calling OS commands

A simple and general solution for calling OS commands from Python

Problem

With Python, calling an OS command in a simple way has long been a pain in the neck, with multiple solutions continuously asked, answered, counter-answered and sometimes deprecated (os.system which was phased out, subprocess,etc.).

Diagnosis

The key is to realize that there are, really, four ways of calling an OS command from a high level language:

  1. as a command: you want to print the output as it comes
  2. as a function: you want no printed output, but a result in the form of a string
  3. as a function with side effect: you want to execute the command, watch what it does, and then analyse the output.
  4. as an ongoing process: you want to get every returned line as soon as it comes and do something with it.

The solution

Explanation

The clever idea, as suggested by Zaiste, is to make a generator function that returns the result of the execution of the OS command, line by line.

This takes care of your fourth case, and implicitly of your first three. Getting the result of the command in one go after is executed is done very simply: by applying list to the result of the generator.

Sundry

  • Subprocess returns byte strings. The solution was to use the str(x, ENCODING) solution, with ENCODING = 'UTF-8'.

Code

import sys, subprocess, shlex

class Popen2(subprocess.Popen):
    "Context manager for Python2"
    def __enter__(self):
        return self
    def __exit__(self, type, value, traceback):
        if self.stdout:
            self.stdout.close()
        if self.stderr:
            self.stderr.close()
        if self.stdin:
            self.stdin.close()
        # Wait for the process to terminate, to avoid zombies.
        self.wait()


def os_command(command, print_output=True, shell=False):
    """
    Run an OS command (utility function) and returns a generator
    with each line of the stdout.

    In case of error, the sterr is forwarded through the exception.

    For the arguments, see run_os_command.
    If you are not sur between os_command and run_os_command,
    then the second is likely for you.
    """
    ENCODING = 'UTF-8'
    if isinstance(command, str):
        # if a string, split into a list:
        command = shlex.split(command)
    # we need a proper context manager for Python 2:
    if sys.version_info < (3,2):
        Popen = Popen2
    else:
        Popen = subprocess.Popen
    # Process:
    with Popen(command,
                    stdout=subprocess.PIPE,
                    stderr=subprocess.PIPE,
                    shell=shell) as process:
        while True:
            line = process.stdout.readline().rstrip()
            if not line:
                # check error:
                process.poll()
                errno = process.returncode
                if errno:
                    # get the error message:
                    stderr_msg = str(process.stderr.read(), ENCODING)
                    errmsg = "Call of '%s' failed with error %s\n%s" % \
                                            (command, errno, stderr_msg)
                    raise OSError(errno, errmsg)
                break
            # Python 3: converto to unicode:
            if sys.version_info > (3,0):
                line = str(line, ENCODING)
            if print_output:
                print(line)
            yield line

def run_os_command(command, print_output=True, shell=False):
    """
    Execute a command, printing as you go (unless you want to suppress it)

    Arguments:
    ----------
        command: eithr a string, a list containing each element of the command
            e.g. ['ls', '-l']
        print_output: print the results as the command executes
            (default: True)
        shell: call the shell; this activates globbing, etc.
            (default: False, as this is safer)

    Returns:
    --------
        A string containing the stdout
    """
    r = list(os_command(command, print_output=print_output, shell=shell))
    return "\n".join(r)


def os_get(command, shell=False):
    """
    Execute a command as a function

    Arguments:
    ----------
        command: a list containing each element of the command
            e.g. ['ls', '-l']
        shell: call the shell; this activates globbing, etc.
            (default: False)

    Returns:
    --------
        A string containing the output
    """
    return run_os_command(command, print_output=False, shell=shell)

Application

Case 1: Command

run_os_command(['ls'])

Case 2: Function (a string)

r = os_get(['ls'])

Which is really:

r = run_os_command(['ls'], print_output=False)

Case 3: Function with side effect (also printing)

r = run_os_command(['ls'])

Case 4: Get a generator and do something with it

for line in os_command(['ping', '-c100', 'www.google.com'], print_output=False):
     print("Look at what just happened:", line)

By default, it will print the lines as it goes, if you want to suppress that and do your own print, you have to set the print_output flag to False.

Error Management

The error management was difficult to get right.

Capturing exit codes

For a call of a legitimate command, but with incorrect parameters, an OSError is raised (the solution found here was to "poll" the process before closing the generator and check the return code).

Incorrect commands

For a missing command on the OS, or incorrect command:

  • On Python 3, it raises a FileNotFoundError (this is automatically raised by subprocess).
  • On Python 2, this raises an OSError.

Context manager for subprocess.Popen

Popen, as of Python 3.2 requires a context, in order to make sure all files are closed (it has to be called with with Popen(...) as process instead of process=Popen(...), otherwise an warning is raised.

For compatibility with previous versions, a context manager Popen2 had to be added (see Stack Overflow: Exception handling when using Python's Subprocess Popen).

@Ms-Amanda
Copy link

非常感谢你的分享,这对我来说非常有用。
不过我在测试代码时发现一个问题,问题出在 rstrip() 方法。

while True:
    line = proc.stdout.readline().rstrip()
    if not line:
        proc.poll()
        break
    print(line)

使用这样的方法判断命令是否执行完成,需要的前提条件是命令的输出中不能有空行。
如果命令输出的结果有空行,使用 rstrip() 方法就会有 bug 产生。
产生 BUG 的代码示例如下:

# test in MacOS

import time
import subprocess

proc = subprocess.Popen('ping -c 5 -i 0.1 www.baidu.com',
                       shell=True,
                       stdout=subprocess.PIPE,
                       universal_newlines=True,
                       )

while True:
    line = proc.stdout.readline().rstrip()
    if not line:
        proc.poll()
        break
    print(line)
    time.sleep(1)

输出结果为:

PING www.a.shifen.com (180.101.49.11): 56 data bytes
64 bytes from 180.101.49.11: icmp_seq=0 ttl=52 time=12.401 ms
64 bytes from 180.101.49.11: icmp_seq=1 ttl=52 time=10.121 ms
64 bytes from 180.101.49.11: icmp_seq=2 ttl=52 time=12.677 ms
64 bytes from 180.101.49.11: icmp_seq=3 ttl=52 time=11.770 ms
64 bytes from 180.101.49.11: icmp_seq=4 ttl=52 time=11.890 ms

进程已结束,退出代码为 0

而正确的输出结果应该为:

$ ping -c 5 -i 0.1 www.baidu.com
PING www.a.shifen.com (180.101.49.11): 56 data bytes
64 bytes from 180.101.49.11: icmp_seq=0 ttl=52 time=11.738 ms
64 bytes from 180.101.49.11: icmp_seq=1 ttl=52 time=11.290 ms
64 bytes from 180.101.49.11: icmp_seq=2 ttl=52 time=11.441 ms
64 bytes from 180.101.49.11: icmp_seq=3 ttl=52 time=12.141 ms
64 bytes from 180.101.49.11: icmp_seq=4 ttl=52 time=11.510 ms

--- www.a.shifen.com ping statistics ---
5 packets transmitted, 5 packets received, 0.0% packet loss
round-trip min/avg/max/stddev = 11.290/11.624/12.141/0.296 ms

所以不能使用 rstrip() 方法,应该使用其他方法去解决换行符。
例如仅需要打印命令的结果时,可以这样处理:

while True:
    line = proc.stdout.readline()
    if not line:
        proc.poll()
        break
    print(line,end='')
    time.sleep(1)

My English is so-so, so I used the Google translation service to translate the above reply, I hope you can understand what I mean.

Thank you so much for sharing, it is very useful for me.
But when I tested the code, I found a problem. The problem lies in the rstrip() method.

while True:
    line = proc.stdout.readline().rstrip()
    if not line:
        proc.poll()
        break
    print(line)

To use this method to determine whether the command is executed, the prerequisite is that there can be no blank lines in the command output.
If there are blank lines in the output of the command, using the rstrip() method will cause a bug.
The code example that generates the BUG is as follows:

# test in MacOS

import time
import subprocess

proc = subprocess.Popen('ping -c 5 -i 0.1 www.baidu.com',
                       shell=True,
                       stdout=subprocess.PIPE,
                       universal_newlines=True,
                       )

while True:
    line = proc.stdout.readline().rstrip()
    if not line:
        proc.poll()
        break
    print(line)
    time.sleep(1)

The result is:

PING www.a.shifen.com (180.101.49.11): 56 data bytes
64 bytes from 180.101.49.11: icmp_seq=0 ttl=52 time=12.401 ms
64 bytes from 180.101.49.11: icmp_seq=1 ttl=52 time=10.121 ms
64 bytes from 180.101.49.11: icmp_seq=2 ttl=52 time=12.677 ms
64 bytes from 180.101.49.11: icmp_seq=3 ttl=52 time=11.770 ms
64 bytes from 180.101.49.11: icmp_seq=4 ttl=52 time=11.890 ms

进程已结束,退出代码为 0

The correct output should be:

$ ping -c 5 -i 0.1 www.baidu.com
PING www.a.shifen.com (180.101.49.11): 56 data bytes
64 bytes from 180.101.49.11: icmp_seq=0 ttl=52 time=11.738 ms
64 bytes from 180.101.49.11: icmp_seq=1 ttl=52 time=11.290 ms
64 bytes from 180.101.49.11: icmp_seq=2 ttl=52 time=11.441 ms
64 bytes from 180.101.49.11: icmp_seq=3 ttl=52 time=12.141 ms
64 bytes from 180.101.49.11: icmp_seq=4 ttl=52 time=11.510 ms

--- www.a.shifen.com ping statistics ---
5 packets transmitted, 5 packets received, 0.0% packet loss
round-trip min/avg/max/stddev = 11.290/11.624/12.141/0.296 ms

Therefore, the rstrip() method cannot be used, and other methods should be used to resolve the line breaks.
For example, when you only need to print the result of a command, you can handle it like this:

while True:
    line = proc.stdout.readline()
    if not line:
        proc.poll()
        break
    print(line,end='')
    time.sleep(1)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment