Last active
August 22, 2023 02:27
-
-
Save plutec/145254e5a7c200afe687 to your computer and use it in GitHub Desktop.
Class to create a subprocess with a watchdog
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import subprocess | |
import datetime | |
""" | |
This class is used to execute a subprocess including a watchdog | |
""" | |
class Subprocess(object): | |
def __init__(self, cmd, cwd=None, stdout=None): | |
self.cmd = cmd | |
self.cwd = cwd | |
self.process = None | |
self.stdout = stdout | |
self.thread = None | |
def run(self): | |
self.process = subprocess.Popen(self.cmd, | |
stdout=self.stdout, | |
cwd=self.cwd) | |
self.start_time = datetime.datetime.now() | |
def wait(self, timeout): | |
while self.process.poll() is None: | |
time.sleep(1) | |
if (datetime.datetime.now()-self.start_time).total_seconds() >= \ | |
timeout: | |
break | |
returncode = self.process.poll() | |
if returncode is None: | |
print("Killing process (timeout) %s", self.cmd) | |
try: | |
self.process.kill() | |
except: | |
pass | |
return returncode | |
#To run | |
my_process = Subprocess(cmd=["./process", "param1", "param2"], | |
stdout=subprocess.PIPE, | |
cwd="/home/user/path") | |
my_process.run() | |
my_process.wait(30) #Waiting time in seconds |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment