Created
September 28, 2022 12:49
-
-
Save abhijitmamarde/6fcb6253084eb2b8dae8a915578ac72a to your computer and use it in GitHub Desktop.
Simple script to watch for specific process is over, and trigger action once it is over
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Simple script to watch for specific process is over, and trigger action once it is over | |
- Waits for some start time (the process is expected to start within this time) | |
- Fetches all the running processes, exiting for below-mentioned conditions | |
- Waits for next run | |
- The watcher exits if: | |
- process is not found running for max watcher timeout, from starting | |
- current time is over max watcher timeout, from last process found time | |
Requirements: | |
- install 3rd party packages: `pip install psutil loguru` | |
- change `check_for_process_name`, `trigger_action_command`, as per your requirement | |
- change the timers sections | |
""" | |
import datetime | |
import os | |
import time | |
import os | |
import psutil | |
from loguru import logger | |
logger.add("automation_watcher.log") | |
def to_secs(hrs=0, mins=0, secs=0): | |
return (hrs * 60 * 60) + (mins * 60) + secs | |
# ----- Process name to watch and Command to trigger, change as per requirement ----- | |
check_for_process_name = "notepad" | |
trigger_action_command = "echo \"TADA\" > \"D:\\output-sample.txt\"" | |
# ------------ Timers, Timeouts, change as per requirement -------------- | |
wait_before_first_check = to_secs(secs=1) # to_secs(mins=5) | |
wait_between_every_check = to_secs(secs=5) # to_secs(mins=1) | |
# triggers action if no process is within idle window timeout | |
idle_window_timeout = to_secs(secs=20) # to_secs(mins=30) | |
# max time the process will keep checking, after timeout, process stops! | |
max_watcher_timeout = to_secs(secs=20) | |
def main(): | |
logger.debug(f"Waiting for first check: {wait_before_first_check}") | |
time.sleep(wait_before_first_check) | |
last_found_time = None | |
last_none_found_time = None | |
start_check_time = datetime.datetime.now().timestamp() | |
while True: | |
search_time = None | |
running_proc_pids = psutil.pids() | |
for pid in running_proc_pids: | |
try: | |
p = psutil.Process(pid) | |
exe = p.exe() | |
if check_for_process_name in exe: | |
name = p.name() | |
cmdline = p.cmdline()[0] + " " + " ".join([repr(x) for x in p.cmdline()[1:]]) | |
logger.debug(f"Process {check_for_process_name!r} found: {pid=}, {cmdline=}") | |
last_found_time = datetime.datetime.now().timestamp() | |
search_time = datetime.datetime.now().timestamp() | |
except Exception as err: | |
print(f"ERROR fetching info for {pid=}, {err=}") | |
pass | |
current_time = datetime.datetime.now().timestamp() | |
if not search_time: | |
last_none_found_time = current_time | |
logger.warning(f"No running process found with {check_for_process_name}!") | |
if last_found_time: | |
logger.debug(f"{current_time=} - {last_found_time=} > {idle_window_timeout=} {(current_time - last_found_time) > idle_window_timeout}") | |
if (current_time - last_found_time) > idle_window_timeout: | |
logger.info("TRIGGERING ACTION COMMAND LOGIC!!!") | |
os.system(trigger_action_command) | |
break | |
else: | |
last_none_found_time = None | |
if ( | |
((last_found_time is None) and ((current_time - start_check_time) > max_watcher_timeout)) or | |
((last_none_found_time is not None) and ((current_time - last_none_found_time) > max_watcher_timeout)) | |
): | |
logger.info("MAX WAIT TIME reached... Process will EXIT NOW!!!") | |
logger.debug(f"{last_found_time=}, {last_none_found_time=}") | |
break | |
logger.debug(f"Waiting for next check: {wait_between_every_check}") | |
time.sleep(wait_between_every_check) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment