Skip to content

Instantly share code, notes, and snippets.

@ClePol
Last active May 15, 2023 14:03
Show Gist options
  • Save ClePol/70178c466f73d0cc73d64bb2f13897f1 to your computer and use it in GitHub Desktop.
Save ClePol/70178c466f73d0cc73d64bb2f13897f1 to your computer and use it in GitHub Desktop.
import argparse
import subprocess
import time
import sys
import select
import threading
import os
import errno
import shlex
import datetime
import readline
# TODO: cancel_all when this script crashes/is interrupted (e.g. by ctrl+c) - maybe use atexit module and add a commandline parameter
# TODO: benchmarking mode for measuring runtime of each process (avoid overloading the system,
# i.e. one process at a time), maybe setting max processes to 1 is enough
# TODO: write process state and run summary to file during execution (e.g. running, finished, cancelled, error, ...)
# TODO: add flag to indicate whether we want to wipe the process list after a run or not
# global lists
timers = []
finished_processes = []
cancelled_processes = []
process_infos = []
STOP_INPUT = False
LOGGING = True
def seconds_to_timestamp_string(seconds):
"""
Converts a number of seconds to a timestamp string
Args:
seconds (float): Number of seconds
Returns:
timestamp (str): Timestamp string
"""
return str(datetime.timedelta(seconds=seconds))#.strftime("%H:%M:%S")
def timestamp_string_to_seconds(timestamp):
"""
Converts a timestamp string to a number of seconds
Args:
timestamp (str): Timestamp string
Returns:
seconds (float): Number of seconds
"""
# sample data 18:15:27.106221
# make timedelta from string
delta = datetime.datetime.strptime(timestamp.strip(), "%H:%M:%S.%f") - datetime.datetime(1900, 1, 1)
# return total seconds
#print(delta.total_seconds())
return delta.total_seconds()
def kill_process(process_info):
# Kill the process using the kill() method of the Popen object
process_info['process'].kill()
# log manual termination
if LOGGING:
with open(process_info['logfile'], "a") as f:
f.write(f"Subprocess {process_info['name']} terminated manually")
process_info['manual_termination'] = True
process_info['end_time'] = time.time()
finished_processes.append(process_info)
# Remove the process from the list of process_infos
process_infos.remove(process_info)
print(f"{process_info['name']} has been cancelled.")
def get_input():
doc_msg = "Enter 'list' to list running processes " \
"or 'cancel <name>' to cancel a process and 'cancel_all' to cancel all running processes:"
while not STOP_INPUT:
# Prompt the user to list running processes or cancel a process
print(f"\n{doc_msg}")
user_input = str(sys.stdin.readline()).strip()
if user_input == 'list':
# Print the list of running processes
print_running_processes(process_infos)
elif user_input.startswith('cancel '):
# Extract the name of the process to cancel from the user input
name_to_cancel = user_input.split(maxsplit=1)[1]
# Look for the process with the specified name in the list of
# running processes
for process_info in process_infos.copy():
if process_info['name'] == name_to_cancel:
kill_process(process_info)
break
else:
print(f"No process found with name '{name_to_cancel}'.")
elif user_input == 'cancel_all':
for process_info in process_infos.copy():
kill_process(process_info)
print('cancelled all processes')
else:
# Invalid input
print(f"Invalid input. {doc_msg}")
# Define a function to launch a process and store its information
def launch_process(sub_command, name, log, max_runtime, resume=False):
"""
Launches a subprocess of the command and logs it to the path
Args:
command (str): Command that should be launched in subprocess
name (str): Name given to the command
log (str): Path where logfiles should be saved. '' if no logs should be created
max_runtime (float): Timeout for subprocess in seconds. If 0 then no timeout. Default = 0
Returns:
process_info (dict):
command (str): Command that has been launched in subprocess
name (str): Name given to the command
process (subprocess.Popen): Subprocess object
start_time (float): Start time
"""
#print('launch', sub_command)
logfile=f"{log}/{name}.log"
# open logging file
if log == '' or log is None:
log_file_handle = subprocess.PIPE
elif os.path.isfile(logfile) and resume:
with open(logfile, mode='r') as f:
# find last line
last_line = f.readlines()
# if file is empty, start from beginning
if len(last_line) == 0:
print(f'found empty logfile {logfile} - replacing')
else:
last_line = last_line[-1]
if 'finished with exitcode' in last_line:
runtime = timestamp_string_to_seconds(last_line.split(' ')[-1])
if runtime < 5:
print(f'found run terminated after {runtime} seconds - re-running and replacing logfile {logfile}')
else:
print(f'process {name} already finished -- skipping {name}')
return None # dont start process
else:
print(f'found incomplete run - replacing logfile {logfile}')
log_file_handle = open(logfile, mode='w')
log_file_handle.write(f"\n\nrestarting at {datetime.datetime.now()}\n\n")
else:
log_file_handle = open(logfile, mode='w')
log_file_handle.write(f"\n\nstarting at {datetime.datetime.now()}\n\n")
# Start a new process using subprocess.Popen()
#running_command = resolve_commands(sub_command)
process = subprocess.Popen(sub_command, shell=True, stdout=log_file_handle, stderr=log_file_handle, universal_newlines=True)
# Store the process information in a dictionary
process_info = {
'logfile': logfile,
'command': sub_command,
'name': name,
'process': process,
'start_time': time.time(),
'end_time': None,
'manual_termination': False,
'timed_out': False
}
# start a timer thread if timeout is given
if max_runtime > 0:
timer_thread = threading.Timer(max_runtime, timeout, [process])
# Set the thread as a daemon so that it doesn't prevent the program from exiting
timer_thread.daemon = True
# Start the thread
timer_thread.start()
return process_info
def timeout(process):
"""
Terminates the process and prints error message
Args:
process (subprocess.Popen): Process for termination
"""
if process.poll() is None:
try:
process_info = [i for i in process_infos if i['process']==process][0]
process.kill()
message = f"Subprocess {process_info['name']} reached timeout limit. " \
f"Terminating early\n"
print(message)
open(f"{args.logdir}/{process_info['name']}.log", "w").write(message)
process_info['timed_out'] = True
process_info['end_time'] = time.time()
finished_processes.append(process_info)
process_infos.remove(process_info)
except OSError as e:
if e.errno != errno.ESRCH:
raise
# Define a function to print the list of running processes
def print_running_processes(process_infos):
"""
Prints the list of running processes
Args:
process_infos (list): List of dictionaries containing process information
"""
print("Running processes:")
for process_info in process_infos:
# Check the status of the process
return_code = process_info['process'].poll()
if return_code is None:
# The process is still running
# Calculate how long the process has been running
run_time = time.time() - process_info['start_time']
# Print a message indicating that the process is still running
# and how long it has been running
print(f"{process_info['name']} (PID {process_info['process'].pid}) "
"has been running for " + seconds_to_timestamp_string(run_time))
def check_processes(process_infos, args, main_logfile, force_write=False):
"""
Checks if any processes have finished and updates the process_infos list
Args:
process_infos (list): List of dictionaries containing process information
args (argparse.Namespace): Command line arguments
main_logfile (str): Path to main logfile
force_write (bool): If True, write to main logfile even if no processes have finished
"""
# Loop over a copy of the list of process_infos
# to avoid modifying it while iterating over it
now = datetime.datetime.now()
date_time_str = now.strftime("%m/%d/%Y, %H:%M:%S")
needs_update = force_write
if args.logdir is not None and args.logdir != '':
LOGGING = True
main_f = open(main_logfile, "a")
else:
LOGGING = False
for process_info in process_infos.copy():
return_code = process_info['process'].poll()
if return_code is not None:
if LOGGING:
main_f.write(f"\n{process_info['name']} terminated at {date_time_str}\n\n")
needs_update = True
break
if not needs_update:
return
if force_write:
update_type = '\nregular'
else:
update_type = 'event'
if LOGGING:
main_f.write(f"{update_type} update at {date_time_str}\n")
for process_info in process_infos.copy():
# Check the status of the process
return_code = process_info['process'].poll()
if return_code is not None:
# The process has completed
process_info['end_time'] = time.time()
# save finish process to finished_processes
finished_processes.append(process_info)
# calculate total run time
run_time = process_info['end_time'] - process_info['start_time']
# Print the output of the process to the console if verbose
if args.verbose:
print(open(process_info['logfile'], "r").read())
# print termination message
message=f"{process_info['name']} finished with" \
f" exitcode {return_code} in " + seconds_to_timestamp_string(run_time) + "\n"
print(message)
# log termination message to file
if LOGGING:
with open(process_info['logfile'], "a") as f:
f.write('\t' + message)
#main_f.write(message)
# Remove the process from the list of process_infos
process_infos.remove(process_info)
else:
#print(process_info)
run_time = time.time() - process_info['start_time']
if LOGGING:
main_f.write(f"\t{process_info['name']} (PID {process_info['process'].pid}) "
"has been running for " + seconds_to_timestamp_string(run_time) + '\n')
if LOGGING:
for process in finished_processes:
return_code = process['process'].poll()
run_time = process['end_time'] - process['start_time']
run_time_str = seconds_to_timestamp_string(run_time)
message = f"\t{process['name']} finished with" \
f" exitcode {return_code} in" \
f" {run_time_str}" \
f" seconds\n"
main_f.write(message)
#else:
# The process is still running
# Calculate how long the process has been running
# run_time = time.time() - process_info['start_time']
# Print a message indicating that the process is still running
# and how long it has been running
# print(f"{process_info['name']} "
# f"(PID {process_info['process'].pid}) "
# f"is still running ({run_time:.2f}s)")
if LOGGING:
main_f.close()
def launch_processes_interactive(args):
"""
args: Namespace object with following attributes:
input (str): Path to text file containing commands to execute
max (int): Maximum number of processes that should run simultaneously
verbose (bool): Whether or not to print the output of the processes
logdir (str): Path where logfiles should be saved. '' if no logs should be created
time (float): Maximum time a subprocess can run in seconds. If 0 then no timeout. Default = 0
check_interval (float): Interval in seconds to check for termination of processes. Default = 0.1
replace (bool): Whether or not to replace existing logfiles. Default = False
resume (bool): Whether or not to resume existing logfiles. Default = False
"""
# Create a thread for user input
input_thread = threading.Thread(target=get_input)
# Set the thread as a daemon so that it doesn't prevent the program from exiting
input_thread.daemon = True
# Start the thread
if args.logdir is not None and args.logdir != '':
main_logfile = f"{args.logdir}/main.log"
if args.resume:
assert(os.path.isfile(main_logfile)), f'log file {main_logfile} does not exist - cannot resume'
LOGGING = True
else:
main_logfile = None
LOGGING = False
no_checks = 0
# setup logging
if LOGGING:
# logging to main_logfile
if not os.path.exists(args.logdir):
# create directory
os.makedirs(args.logdir)
if os.path.isfile(main_logfile) and not args.resume:
# delete if file already exists and create new file
print(f'{main_logfile} already exists.')
if not args.replace:
sys.exit(0)
else:
os.remove(main_logfile)
if args.resume:
with open(main_logfile, 'a') as main_f:
main_f.write(f"\n\nresuming at {datetime.datetime.now()}\n\n")
else:
with open(main_logfile, 'a') as main_f:
main_f.write(f"\n\nstarting at {datetime.datetime.now()}\n\n")
# change stdout to print to logging
#sys.stdout = Tee(sys.stdout, mainfilehandle)
#sys.stderr = Tee(sys.stderr, mainfilehandle)
input_thread.start()
STOP_INPUT = False
# Open the text file containing the bash commands and their names
with open(args.input, 'r') as file:
# Read each line of the file and store the command and name in a dictionary
for i, line in enumerate(file):
if line.strip() == '':
continue
# Wait if there are already 10 processes running
while len(process_infos) >= args.max:
check_processes(process_infos, args, main_logfile, force_write = no_checks % 100 == 0)
time.sleep(args.check_interval)
no_checks += 1
# Remove any leading or trailing whitespace from the line
line = line.strip()
# Split the line into the command and name
command, name = line.split(',', maxsplit=1)
print('Launched', name)
name = name.strip()
# Launch the process and store its information in a dictionary
process_info = launch_process(sub_command=command, name=name,
log=args.logdir, max_runtime=args.time, resume=args.resume)
if process_info is None:
continue
process_infos.append(process_info)
# Main loop to monitor running processes and allow user to cancel them
while len(process_infos) > 0:
check_processes(process_infos, args, main_logfile, force_write = no_checks % 100 == 0)
time.sleep(args.check_interval)
no_checks += 1
STOP_INPUT = True
print("")
print("---------------------------------------------------")
print("Summary: ")
#print(finished_processes)
for process in finished_processes:
return_code = process['process'].poll()
run_time = process['end_time'] - process['start_time']
if process['timed_out']:
finish_message = 'timed out'
elif return_code == 0:
finish_message = 'finished successfully - '
elif process['manual_termination']:
finish_message = 'terminated by user'
else:
finish_message = 'finished with non-zero'
message = f"Process ({process['name']}) " + finish_message + \
f" exitcode {return_code} in " + seconds_to_timestamp_string(run_time)
print(message)
print("All processes have completed.")
# log table with name, runtime, exitcode, command, logfile, manual termination, success
#print(finished_processes)
if LOGGING:
with open(os.path.join(args.logdir, 'run_summary.csv'), 'w') as table_f:
print('logging table to run_summary.csv')
# log table with name, runtime, exitcode, command, logfile, manual termination, success
#print(finished_processes)
#main_f.write("\n\n")
#main_f.write("---------------------------------------------------\n")
#main_f.write("Summary table:\n")
#main_f.write("---------------------------------------------------\n")
table_f.write("Name,Runtime,Exitcode,Command,Logfile,Manual termination,Timed out,Success\n")
for process in finished_processes:
return_code = process['process'].poll()
run_time = process['end_time'] - process['start_time']
#message = f"{process['name']}\t\t\t{run_time:.2f}\t\t{return_code}\t\t{process['command']}\t\t{process['logfile']}\n"
message = f"{process['name']},{run_time:.2f},{return_code},{process['command']},{process['logfile']},{process['manual_termination']},{process['timed_out']},{return_code == 0}\n"
table_f.write(message)
with open(main_logfile, 'a') as main_f:
main_f.write("---------------------------------------------------\n")
main_f.write("Summary table:\n")
main_f.write("---------------------------------------------------\n")
main_f.write("Name\t\t\tRuntime\t\tExitcode\t\tCommand\t\tLogfile\t\tManual termination\t\tTimed out\t\tSuccess\n")
for process in finished_processes:
return_code = process['process'].poll()
run_time = process['end_time'] - process['start_time']
message = f"{process['name']}\t\t\t{run_time:.2f}\t\t{return_code}\t\t{process['command']}\t\t{process['logfile']}\n"
main_f.write(message)
# clear global variables to avoid retaining information from previous executions (could be a feature)
timers.clear()
finished_processes.clear()
cancelled_processes.clear()
process_infos.clear()
if __name__ == '__main__':
"""
This script is used to run multiple commands in parallel. It takes a text file as input, where each line contains a command and a name for the command, separated by a comma.
The script will then run the commands in parallel, and log the output of each command to a separate file.
It will also monitor the running processes, and allow the user to cancel them if they are taking too long to complete.
"""
# check if freesurfer is sourced
if '/fsfast/bin' not in os.environ.get('PATH'):
print('is not set sourced - please source freesurfer before running this script') # this is a safeguard against freesurfer commands failing en masse
sys.exit(0)
else:
print('freesurfer is sourced')
parser = argparse.ArgumentParser()
parser.add_argument('-i', '--input', type=str, required=True,help='input file with commands to execute in parallel, one per line, described in the following format: <command> <name>')
parser.add_argument('--max_p', type=int, required=False, dest='max', default=3,
help='Maximum number of subprocesses that should run (default: 3)'
'simultaneously')
parser.add_argument('--verbose', action='store_true' ,
help="print commands output to terminal")
parser.add_argument('-l','--logdir', type=str, dest="logdir", default=None,
help="Path where logfiles should be saved. default: no logs"
"should be created")
parser.add_argument('--max_t', type=float, required=False, dest='time', default=0.0,
help="Maximum time a subprocess can run in seconds")
parser.add_argument('--check_interval', type=int, help='interval to check termination', default=0.1)
parser.add_argument('--replace', action='store_true')
parser.add_argument('--resume', action='store_true')
args = parser.parse_args()
launch_processes_interactive(args)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment