Last active
July 21, 2022 14:38
-
-
Save awesomebytes/fd1dba778c3569d09702 to your computer and use it in GitHub Desktop.
Spawn a shell with a command with some extra options
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import subprocess | |
import tempfile | |
import os | |
import signal | |
class ShellCmd: | |
"""Helpful class to spawn commands and keep track of them""" | |
def __init__(self, cmd): | |
self.retcode = None | |
self.last_outf_byte = 0 | |
self.outf = tempfile.NamedTemporaryFile(mode="w") | |
self.errf = tempfile.NamedTemporaryFile(mode="w") | |
self.inf = tempfile.NamedTemporaryFile(mode="r") | |
self.process = subprocess.Popen(cmd, shell=True, stdin=self.inf, | |
stdout=self.outf, stderr=self.errf, | |
preexec_fn=os.setsid, close_fds=True) | |
def __del__(self): | |
if not self.is_done(): | |
self.kill() | |
self.outf.close() | |
self.errf.close() | |
self.inf.close() | |
def consume_stdout(self): | |
with open(self.outf.name, "r") as f: | |
f.seek(self.last_outf_byte, 0) | |
output = f.read() | |
self.last_outf_byte = f.tell() | |
return output | |
def get_stdout(self): | |
with open(self.outf.name, "r") as f: | |
return f.read() | |
def get_stderr(self): | |
with open(self.errf.name, "r") as f: | |
return f.read() | |
def get_retcode(self): | |
"""Get retcode or None if still running""" | |
if self.retcode is None: | |
self.retcode = self.process.poll() | |
return self.retcode | |
def is_done(self): | |
return self.get_retcode() is not None | |
def is_succeeded(self): | |
"""Check if the process ended with success state (retcode 0) | |
If the process hasn't finished yet this will be False.""" | |
return self.get_retcode() == 0 | |
def kill(self): | |
self.retcode = -1 | |
os.killpg(self.process.pid, signal.SIGTERM) | |
self.process.wait() | |
# Demonstration of usage | |
if __name__ == '__main__': | |
import time | |
cmd = ShellCmd("sleep 3") | |
try: | |
while not cmd.is_done(): | |
print("Still sleeping...") | |
time.sleep(0.5) | |
except KeyboardInterrupt: | |
print("Pressed Control+C, stopping command.") | |
cmd.kill() | |
if cmd.is_succeeded(): | |
print("The command finished succesfully") | |
print("It printed: " + str(cmd.get_stdout())) | |
else: | |
print("The command didn't finish succesfully") | |
print("Its stderr was: " + str(cmd.get_stdout())) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment