Skip to content

Instantly share code, notes, and snippets.

@abhijitmamarde
Created September 28, 2022 12:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save abhijitmamarde/6fcb6253084eb2b8dae8a915578ac72a to your computer and use it in GitHub Desktop.
Save abhijitmamarde/6fcb6253084eb2b8dae8a915578ac72a to your computer and use it in GitHub Desktop.
Simple script to watch for specific process is over, and trigger action once it is over
"""Simple script to watch for specific process is over, and trigger action once it is over
- Waits for some start time (the process is expected to start within this time)
- Fetches all the running processes, exiting for below-mentioned conditions
- Waits for next run
- The watcher exits if:
- process is not found running for max watcher timeout, from starting
- current time is over max watcher timeout, from last process found time
Requirements:
- install 3rd party packages: `pip install psutil loguru`
- change `check_for_process_name`, `trigger_action_command`, as per your requirement
- change the timers sections
"""
import datetime
import os
import time
import os
import psutil
from loguru import logger
logger.add("automation_watcher.log")
def to_secs(hrs=0, mins=0, secs=0):
return (hrs * 60 * 60) + (mins * 60) + secs
# ----- Process name to watch and Command to trigger, change as per requirement -----
check_for_process_name = "notepad"
trigger_action_command = "echo \"TADA\" > \"D:\\output-sample.txt\""
# ------------ Timers, Timeouts, change as per requirement --------------
wait_before_first_check = to_secs(secs=1) # to_secs(mins=5)
wait_between_every_check = to_secs(secs=5) # to_secs(mins=1)
# triggers action if no process is within idle window timeout
idle_window_timeout = to_secs(secs=20) # to_secs(mins=30)
# max time the process will keep checking, after timeout, process stops!
max_watcher_timeout = to_secs(secs=20)
def main():
logger.debug(f"Waiting for first check: {wait_before_first_check}")
time.sleep(wait_before_first_check)
last_found_time = None
last_none_found_time = None
start_check_time = datetime.datetime.now().timestamp()
while True:
search_time = None
running_proc_pids = psutil.pids()
for pid in running_proc_pids:
try:
p = psutil.Process(pid)
exe = p.exe()
if check_for_process_name in exe:
name = p.name()
cmdline = p.cmdline()[0] + " " + " ".join([repr(x) for x in p.cmdline()[1:]])
logger.debug(f"Process {check_for_process_name!r} found: {pid=}, {cmdline=}")
last_found_time = datetime.datetime.now().timestamp()
search_time = datetime.datetime.now().timestamp()
except Exception as err:
print(f"ERROR fetching info for {pid=}, {err=}")
pass
current_time = datetime.datetime.now().timestamp()
if not search_time:
last_none_found_time = current_time
logger.warning(f"No running process found with {check_for_process_name}!")
if last_found_time:
logger.debug(f"{current_time=} - {last_found_time=} > {idle_window_timeout=} {(current_time - last_found_time) > idle_window_timeout}")
if (current_time - last_found_time) > idle_window_timeout:
logger.info("TRIGGERING ACTION COMMAND LOGIC!!!")
os.system(trigger_action_command)
break
else:
last_none_found_time = None
if (
((last_found_time is None) and ((current_time - start_check_time) > max_watcher_timeout)) or
((last_none_found_time is not None) and ((current_time - last_none_found_time) > max_watcher_timeout))
):
logger.info("MAX WAIT TIME reached... Process will EXIT NOW!!!")
logger.debug(f"{last_found_time=}, {last_none_found_time=}")
break
logger.debug(f"Waiting for next check: {wait_between_every_check}")
time.sleep(wait_between_every_check)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment