Last active
May 15, 2023 14:03
-
-
Save ClePol/70178c466f73d0cc73d64bb2f13897f1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import subprocess | |
import time | |
import sys | |
import select | |
import threading | |
import os | |
import errno | |
import shlex | |
import datetime | |
import readline | |
# TODO: cancel_all when this script crashes/is interrupted (e.g. by ctrl+c) - maybe use atexit module and add a commandline parameter | |
# TODO: benchmarking mode for measuring runtime of each process (avoid overloading the system, | |
# i.e. one process at a time), maybe setting max processes to 1 is enough | |
# TODO: write process state and run summary to file during execution (e.g. running, finished, cancelled, error, ...) | |
# TODO: add flag to indicate whether we want to wipe the process list after a run or not | |
# global lists | |
timers = [] | |
finished_processes = [] | |
cancelled_processes = [] | |
process_infos = [] | |
STOP_INPUT = False | |
LOGGING = True | |
def seconds_to_timestamp_string(seconds): | |
""" | |
Converts a number of seconds to a timestamp string | |
Args: | |
seconds (float): Number of seconds | |
Returns: | |
timestamp (str): Timestamp string | |
""" | |
return str(datetime.timedelta(seconds=seconds))#.strftime("%H:%M:%S") | |
def timestamp_string_to_seconds(timestamp): | |
""" | |
Converts a timestamp string to a number of seconds | |
Args: | |
timestamp (str): Timestamp string | |
Returns: | |
seconds (float): Number of seconds | |
""" | |
# sample data 18:15:27.106221 | |
# make timedelta from string | |
delta = datetime.datetime.strptime(timestamp.strip(), "%H:%M:%S.%f") - datetime.datetime(1900, 1, 1) | |
# return total seconds | |
#print(delta.total_seconds()) | |
return delta.total_seconds() | |
def kill_process(process_info): | |
# Kill the process using the kill() method of the Popen object | |
process_info['process'].kill() | |
# log manual termination | |
if LOGGING: | |
with open(process_info['logfile'], "a") as f: | |
f.write(f"Subprocess {process_info['name']} terminated manually") | |
process_info['manual_termination'] = True | |
process_info['end_time'] = time.time() | |
finished_processes.append(process_info) | |
# Remove the process from the list of process_infos | |
process_infos.remove(process_info) | |
print(f"{process_info['name']} has been cancelled.") | |
def get_input(): | |
doc_msg = "Enter 'list' to list running processes " \ | |
"or 'cancel <name>' to cancel a process and 'cancel_all' to cancel all running processes:" | |
while not STOP_INPUT: | |
# Prompt the user to list running processes or cancel a process | |
print(f"\n{doc_msg}") | |
user_input = str(sys.stdin.readline()).strip() | |
if user_input == 'list': | |
# Print the list of running processes | |
print_running_processes(process_infos) | |
elif user_input.startswith('cancel '): | |
# Extract the name of the process to cancel from the user input | |
name_to_cancel = user_input.split(maxsplit=1)[1] | |
# Look for the process with the specified name in the list of | |
# running processes | |
for process_info in process_infos.copy(): | |
if process_info['name'] == name_to_cancel: | |
kill_process(process_info) | |
break | |
else: | |
print(f"No process found with name '{name_to_cancel}'.") | |
elif user_input == 'cancel_all': | |
for process_info in process_infos.copy(): | |
kill_process(process_info) | |
print('cancelled all processes') | |
else: | |
# Invalid input | |
print(f"Invalid input. {doc_msg}") | |
# Define a function to launch a process and store its information | |
def launch_process(sub_command, name, log, max_runtime, resume=False): | |
""" | |
Launches a subprocess of the command and logs it to the path | |
Args: | |
command (str): Command that should be launched in subprocess | |
name (str): Name given to the command | |
log (str): Path where logfiles should be saved. '' if no logs should be created | |
max_runtime (float): Timeout for subprocess in seconds. If 0 then no timeout. Default = 0 | |
Returns: | |
process_info (dict): | |
command (str): Command that has been launched in subprocess | |
name (str): Name given to the command | |
process (subprocess.Popen): Subprocess object | |
start_time (float): Start time | |
""" | |
#print('launch', sub_command) | |
logfile=f"{log}/{name}.log" | |
# open logging file | |
if log == '' or log is None: | |
log_file_handle = subprocess.PIPE | |
elif os.path.isfile(logfile) and resume: | |
with open(logfile, mode='r') as f: | |
# find last line | |
last_line = f.readlines() | |
# if file is empty, start from beginning | |
if len(last_line) == 0: | |
print(f'found empty logfile {logfile} - replacing') | |
else: | |
last_line = last_line[-1] | |
if 'finished with exitcode' in last_line: | |
runtime = timestamp_string_to_seconds(last_line.split(' ')[-1]) | |
if runtime < 5: | |
print(f'found run terminated after {runtime} seconds - re-running and replacing logfile {logfile}') | |
else: | |
print(f'process {name} already finished -- skipping {name}') | |
return None # dont start process | |
else: | |
print(f'found incomplete run - replacing logfile {logfile}') | |
log_file_handle = open(logfile, mode='w') | |
log_file_handle.write(f"\n\nrestarting at {datetime.datetime.now()}\n\n") | |
else: | |
log_file_handle = open(logfile, mode='w') | |
log_file_handle.write(f"\n\nstarting at {datetime.datetime.now()}\n\n") | |
# Start a new process using subprocess.Popen() | |
#running_command = resolve_commands(sub_command) | |
process = subprocess.Popen(sub_command, shell=True, stdout=log_file_handle, stderr=log_file_handle, universal_newlines=True) | |
# Store the process information in a dictionary | |
process_info = { | |
'logfile': logfile, | |
'command': sub_command, | |
'name': name, | |
'process': process, | |
'start_time': time.time(), | |
'end_time': None, | |
'manual_termination': False, | |
'timed_out': False | |
} | |
# start a timer thread if timeout is given | |
if max_runtime > 0: | |
timer_thread = threading.Timer(max_runtime, timeout, [process]) | |
# Set the thread as a daemon so that it doesn't prevent the program from exiting | |
timer_thread.daemon = True | |
# Start the thread | |
timer_thread.start() | |
return process_info | |
def timeout(process): | |
""" | |
Terminates the process and prints error message | |
Args: | |
process (subprocess.Popen): Process for termination | |
""" | |
if process.poll() is None: | |
try: | |
process_info = [i for i in process_infos if i['process']==process][0] | |
process.kill() | |
message = f"Subprocess {process_info['name']} reached timeout limit. " \ | |
f"Terminating early\n" | |
print(message) | |
open(f"{args.logdir}/{process_info['name']}.log", "w").write(message) | |
process_info['timed_out'] = True | |
process_info['end_time'] = time.time() | |
finished_processes.append(process_info) | |
process_infos.remove(process_info) | |
except OSError as e: | |
if e.errno != errno.ESRCH: | |
raise | |
# Define a function to print the list of running processes | |
def print_running_processes(process_infos): | |
""" | |
Prints the list of running processes | |
Args: | |
process_infos (list): List of dictionaries containing process information | |
""" | |
print("Running processes:") | |
for process_info in process_infos: | |
# Check the status of the process | |
return_code = process_info['process'].poll() | |
if return_code is None: | |
# The process is still running | |
# Calculate how long the process has been running | |
run_time = time.time() - process_info['start_time'] | |
# Print a message indicating that the process is still running | |
# and how long it has been running | |
print(f"{process_info['name']} (PID {process_info['process'].pid}) " | |
"has been running for " + seconds_to_timestamp_string(run_time)) | |
def check_processes(process_infos, args, main_logfile, force_write=False): | |
""" | |
Checks if any processes have finished and updates the process_infos list | |
Args: | |
process_infos (list): List of dictionaries containing process information | |
args (argparse.Namespace): Command line arguments | |
main_logfile (str): Path to main logfile | |
force_write (bool): If True, write to main logfile even if no processes have finished | |
""" | |
# Loop over a copy of the list of process_infos | |
# to avoid modifying it while iterating over it | |
now = datetime.datetime.now() | |
date_time_str = now.strftime("%m/%d/%Y, %H:%M:%S") | |
needs_update = force_write | |
if args.logdir is not None and args.logdir != '': | |
LOGGING = True | |
main_f = open(main_logfile, "a") | |
else: | |
LOGGING = False | |
for process_info in process_infos.copy(): | |
return_code = process_info['process'].poll() | |
if return_code is not None: | |
if LOGGING: | |
main_f.write(f"\n{process_info['name']} terminated at {date_time_str}\n\n") | |
needs_update = True | |
break | |
if not needs_update: | |
return | |
if force_write: | |
update_type = '\nregular' | |
else: | |
update_type = 'event' | |
if LOGGING: | |
main_f.write(f"{update_type} update at {date_time_str}\n") | |
for process_info in process_infos.copy(): | |
# Check the status of the process | |
return_code = process_info['process'].poll() | |
if return_code is not None: | |
# The process has completed | |
process_info['end_time'] = time.time() | |
# save finish process to finished_processes | |
finished_processes.append(process_info) | |
# calculate total run time | |
run_time = process_info['end_time'] - process_info['start_time'] | |
# Print the output of the process to the console if verbose | |
if args.verbose: | |
print(open(process_info['logfile'], "r").read()) | |
# print termination message | |
message=f"{process_info['name']} finished with" \ | |
f" exitcode {return_code} in " + seconds_to_timestamp_string(run_time) + "\n" | |
print(message) | |
# log termination message to file | |
if LOGGING: | |
with open(process_info['logfile'], "a") as f: | |
f.write('\t' + message) | |
#main_f.write(message) | |
# Remove the process from the list of process_infos | |
process_infos.remove(process_info) | |
else: | |
#print(process_info) | |
run_time = time.time() - process_info['start_time'] | |
if LOGGING: | |
main_f.write(f"\t{process_info['name']} (PID {process_info['process'].pid}) " | |
"has been running for " + seconds_to_timestamp_string(run_time) + '\n') | |
if LOGGING: | |
for process in finished_processes: | |
return_code = process['process'].poll() | |
run_time = process['end_time'] - process['start_time'] | |
run_time_str = seconds_to_timestamp_string(run_time) | |
message = f"\t{process['name']} finished with" \ | |
f" exitcode {return_code} in" \ | |
f" {run_time_str}" \ | |
f" seconds\n" | |
main_f.write(message) | |
#else: | |
# The process is still running | |
# Calculate how long the process has been running | |
# run_time = time.time() - process_info['start_time'] | |
# Print a message indicating that the process is still running | |
# and how long it has been running | |
# print(f"{process_info['name']} " | |
# f"(PID {process_info['process'].pid}) " | |
# f"is still running ({run_time:.2f}s)") | |
if LOGGING: | |
main_f.close() | |
def launch_processes_interactive(args): | |
""" | |
args: Namespace object with following attributes: | |
input (str): Path to text file containing commands to execute | |
max (int): Maximum number of processes that should run simultaneously | |
verbose (bool): Whether or not to print the output of the processes | |
logdir (str): Path where logfiles should be saved. '' if no logs should be created | |
time (float): Maximum time a subprocess can run in seconds. If 0 then no timeout. Default = 0 | |
check_interval (float): Interval in seconds to check for termination of processes. Default = 0.1 | |
replace (bool): Whether or not to replace existing logfiles. Default = False | |
resume (bool): Whether or not to resume existing logfiles. Default = False | |
""" | |
# Create a thread for user input | |
input_thread = threading.Thread(target=get_input) | |
# Set the thread as a daemon so that it doesn't prevent the program from exiting | |
input_thread.daemon = True | |
# Start the thread | |
if args.logdir is not None and args.logdir != '': | |
main_logfile = f"{args.logdir}/main.log" | |
if args.resume: | |
assert(os.path.isfile(main_logfile)), f'log file {main_logfile} does not exist - cannot resume' | |
LOGGING = True | |
else: | |
main_logfile = None | |
LOGGING = False | |
no_checks = 0 | |
# setup logging | |
if LOGGING: | |
# logging to main_logfile | |
if not os.path.exists(args.logdir): | |
# create directory | |
os.makedirs(args.logdir) | |
if os.path.isfile(main_logfile) and not args.resume: | |
# delete if file already exists and create new file | |
print(f'{main_logfile} already exists.') | |
if not args.replace: | |
sys.exit(0) | |
else: | |
os.remove(main_logfile) | |
if args.resume: | |
with open(main_logfile, 'a') as main_f: | |
main_f.write(f"\n\nresuming at {datetime.datetime.now()}\n\n") | |
else: | |
with open(main_logfile, 'a') as main_f: | |
main_f.write(f"\n\nstarting at {datetime.datetime.now()}\n\n") | |
# change stdout to print to logging | |
#sys.stdout = Tee(sys.stdout, mainfilehandle) | |
#sys.stderr = Tee(sys.stderr, mainfilehandle) | |
input_thread.start() | |
STOP_INPUT = False | |
# Open the text file containing the bash commands and their names | |
with open(args.input, 'r') as file: | |
# Read each line of the file and store the command and name in a dictionary | |
for i, line in enumerate(file): | |
if line.strip() == '': | |
continue | |
# Wait if there are already 10 processes running | |
while len(process_infos) >= args.max: | |
check_processes(process_infos, args, main_logfile, force_write = no_checks % 100 == 0) | |
time.sleep(args.check_interval) | |
no_checks += 1 | |
# Remove any leading or trailing whitespace from the line | |
line = line.strip() | |
# Split the line into the command and name | |
command, name = line.split(',', maxsplit=1) | |
print('Launched', name) | |
name = name.strip() | |
# Launch the process and store its information in a dictionary | |
process_info = launch_process(sub_command=command, name=name, | |
log=args.logdir, max_runtime=args.time, resume=args.resume) | |
if process_info is None: | |
continue | |
process_infos.append(process_info) | |
# Main loop to monitor running processes and allow user to cancel them | |
while len(process_infos) > 0: | |
check_processes(process_infos, args, main_logfile, force_write = no_checks % 100 == 0) | |
time.sleep(args.check_interval) | |
no_checks += 1 | |
STOP_INPUT = True | |
print("") | |
print("---------------------------------------------------") | |
print("Summary: ") | |
#print(finished_processes) | |
for process in finished_processes: | |
return_code = process['process'].poll() | |
run_time = process['end_time'] - process['start_time'] | |
if process['timed_out']: | |
finish_message = 'timed out' | |
elif return_code == 0: | |
finish_message = 'finished successfully - ' | |
elif process['manual_termination']: | |
finish_message = 'terminated by user' | |
else: | |
finish_message = 'finished with non-zero' | |
message = f"Process ({process['name']}) " + finish_message + \ | |
f" exitcode {return_code} in " + seconds_to_timestamp_string(run_time) | |
print(message) | |
print("All processes have completed.") | |
# log table with name, runtime, exitcode, command, logfile, manual termination, success | |
#print(finished_processes) | |
if LOGGING: | |
with open(os.path.join(args.logdir, 'run_summary.csv'), 'w') as table_f: | |
print('logging table to run_summary.csv') | |
# log table with name, runtime, exitcode, command, logfile, manual termination, success | |
#print(finished_processes) | |
#main_f.write("\n\n") | |
#main_f.write("---------------------------------------------------\n") | |
#main_f.write("Summary table:\n") | |
#main_f.write("---------------------------------------------------\n") | |
table_f.write("Name,Runtime,Exitcode,Command,Logfile,Manual termination,Timed out,Success\n") | |
for process in finished_processes: | |
return_code = process['process'].poll() | |
run_time = process['end_time'] - process['start_time'] | |
#message = f"{process['name']}\t\t\t{run_time:.2f}\t\t{return_code}\t\t{process['command']}\t\t{process['logfile']}\n" | |
message = f"{process['name']},{run_time:.2f},{return_code},{process['command']},{process['logfile']},{process['manual_termination']},{process['timed_out']},{return_code == 0}\n" | |
table_f.write(message) | |
with open(main_logfile, 'a') as main_f: | |
main_f.write("---------------------------------------------------\n") | |
main_f.write("Summary table:\n") | |
main_f.write("---------------------------------------------------\n") | |
main_f.write("Name\t\t\tRuntime\t\tExitcode\t\tCommand\t\tLogfile\t\tManual termination\t\tTimed out\t\tSuccess\n") | |
for process in finished_processes: | |
return_code = process['process'].poll() | |
run_time = process['end_time'] - process['start_time'] | |
message = f"{process['name']}\t\t\t{run_time:.2f}\t\t{return_code}\t\t{process['command']}\t\t{process['logfile']}\n" | |
main_f.write(message) | |
# clear global variables to avoid retaining information from previous executions (could be a feature) | |
timers.clear() | |
finished_processes.clear() | |
cancelled_processes.clear() | |
process_infos.clear() | |
if __name__ == '__main__': | |
""" | |
This script is used to run multiple commands in parallel. It takes a text file as input, where each line contains a command and a name for the command, separated by a comma. | |
The script will then run the commands in parallel, and log the output of each command to a separate file. | |
It will also monitor the running processes, and allow the user to cancel them if they are taking too long to complete. | |
""" | |
# check if freesurfer is sourced | |
if '/fsfast/bin' not in os.environ.get('PATH'): | |
print('is not set sourced - please source freesurfer before running this script') # this is a safeguard against freesurfer commands failing en masse | |
sys.exit(0) | |
else: | |
print('freesurfer is sourced') | |
parser = argparse.ArgumentParser() | |
parser.add_argument('-i', '--input', type=str, required=True,help='input file with commands to execute in parallel, one per line, described in the following format: <command> <name>') | |
parser.add_argument('--max_p', type=int, required=False, dest='max', default=3, | |
help='Maximum number of subprocesses that should run (default: 3)' | |
'simultaneously') | |
parser.add_argument('--verbose', action='store_true' , | |
help="print commands output to terminal") | |
parser.add_argument('-l','--logdir', type=str, dest="logdir", default=None, | |
help="Path where logfiles should be saved. default: no logs" | |
"should be created") | |
parser.add_argument('--max_t', type=float, required=False, dest='time', default=0.0, | |
help="Maximum time a subprocess can run in seconds") | |
parser.add_argument('--check_interval', type=int, help='interval to check termination', default=0.1) | |
parser.add_argument('--replace', action='store_true') | |
parser.add_argument('--resume', action='store_true') | |
args = parser.parse_args() | |
launch_processes_interactive(args) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment