Skip to content

Instantly share code, notes, and snippets.

@ReallyLiri
Created July 11, 2019 08:53
Show Gist options
  • Save ReallyLiri/18eb1306936da50253937166cc490843 to your computer and use it in GitHub Desktop.
Save ReallyLiri/18eb1306936da50253937166cc490843 to your computer and use it in GitHub Desktop.
Run external processes from python
import gzip
import shutil
import logging
import os
from subprocess import Popen, PIPE, TimeoutExpired
SUCCESS_CODE = 0
TIMEOUT_SECONDS = 600
class ProcessSetupError(Exception):
pass
class ProcessTimeoutError(Exception):
pass
def pipe_to_str(bytes_str):
return bytes_str.decode().replace('\\n', '\n') if isinstance(bytes_str, bytes) else str(bytes_str)
def run_process(args, cwd=None, read_stdin_path=None, plot_stdout_path=None, append=False, env=None):
process_name = args[0]
logging.debug("Running '%s'", ' '.join(args))
try:
stdin = PIPE if read_stdin_path else None
if cwd and not os.path.isabs(cwd):
cwd = os.path.join(os.getcwd(), cwd)
process = Popen(args, stdout=PIPE, stderr=PIPE, stdin=stdin, cwd=cwd, shell=False, env=env)
if process is None:
raise ProcessSetupError("An Error occurred while trying to start process '{}'".format(process_name))
if read_stdin_path:
with gzip.open(read_stdin_path, 'rb') as f:
shutil.copyfileobj(f, process.stdin)
if plot_stdout_path:
with gzip.open(plot_stdout_path, 'ab' if append else 'wb') as f:
shutil.copyfileobj(process.stdout, f)
try:
output, errors = process.communicate(timeout=TIMEOUT_SECONDS)
except TimeoutExpired:
logging.exception("Timeout of process '%s' after %d seconds", process_name, TIMEOUT_SECONDS)
raise ProcessTimeoutError
output = pipe_to_str(output)
if output:
logging.info("Process '%s' output:\n%s", process_name, output)
errors = pipe_to_str(errors)
if errors:
logging.info("Process '%s' errors:\n%s", process_name, errors)
if process.returncode != SUCCESS_CODE:
logging.error("process '%s' run failed, return code: %s, stderr: %s", process_name, process.returncode, pipe_to_str(errors))
return False, output
return True, output
except Exception as ex:
logging.exception("Process '%s' run failed with exception", process_name)
return False, None
def exec_system(command):
code = os.system(command)
if code != 0:
raise RuntimeError(f"Command '{command}' failed with code {code}")
def wrap_args_with_ssh_access(args, ssh_access):
modified_args = [
'ssh',
ssh_access,
" ".join(args)
]
return modified_args
def wrap_args_with_pod_access(args, pod):
modified_args = [
'kubectl',
'exec',
'-it',
pod,
'--'
]
modified_args.extend(args)
return modified_args
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment