Last active
November 3, 2023 06:29
-
-
Save wkettler/9235609 to your computer and use it in GitHub Desktop.
Python subprocess with timeout.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import subprocess | |
import signal | |
class Signal(Exception): | |
""" | |
This exception is raise by the signal handler. | |
""" | |
pass | |
class Timeout(Exception): | |
""" | |
This exception is raised when the command exceeds the defined timeout | |
duration and the command is killed. | |
""" | |
def __init__(self, cmd, timeout): | |
self.cmd = cmd | |
self.timeout = timeout | |
def __str__(self): | |
return "Command '%s' timed out after %d second(s)." % \ | |
(self.cmd, self.timeout) | |
class Retcode(Exception): | |
""" | |
This exception is raise when a command exits with a non-zero exit status. | |
""" | |
def __init__(self, cmd, retcode, output=None): | |
self.cmd = cmd | |
self.retcode = retcode | |
self.output = output | |
def __str__(self): | |
return "Command '%s' returned non-zero exit status %d" % \ | |
(self.cmd, self.returncode) | |
def alarm_handler(signum, frame): | |
raise Signal | |
def execute(cmd, timeout=None): | |
""" | |
Execute a command in the default shell. If a timeout is defined the command | |
will be killed if the timeout is exceeded and an exception will be raised. | |
Inputs: | |
cmd (str): Command to execute | |
timeout (int): Command timeout in seconds | |
Outputs: | |
output (str): STDOUT/STDERR | |
""" | |
# Define the timeout signal | |
if timeout: | |
signal.signal(signal.SIGALRM, alarm_handler) | |
signal.alarm(timeout) | |
try: | |
# Execute the command and wait for the subprocess to terminate | |
# STDERR is redirected to STDOUT | |
phandle = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, | |
stderr=subprocess.STDOUT) | |
# Read the stdout/sterr buffers and retcode | |
output, _ = phandle.communicate() | |
retcode = phandle.poll() | |
except Signal: | |
# Kill the running process | |
phandle.kill() | |
raise Timeout(cmd=cmd, timeout=timeout) | |
except: | |
raise | |
else: | |
# Possible race condition where alarm isn't disabled in time | |
signal.alarm(0) | |
# Raise an exception if the command exited with non-zero exit status | |
if retcode: | |
raise Retcode(cmd, retcode, output=output) | |
return output |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi Dear
How to modify this code to use in milliseconds?
Tks
Luciano