Last active
August 28, 2023 23:40
-
-
Save claytonjroberts/f2898237065884da6ddebb1bffac66fe to your computer and use it in GitHub Desktop.
IT Process Terminator
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Code for continous termination of IT processes. | |
Designed for processes that are automatically uploaded to | |
a MacOS computer by means of serial number. | |
""" | |
import psutil | |
import time | |
import re | |
import datetime as dt | |
import logging | |
from typing import List, Dict | |
from dataclasses import dataclass, field, fields | |
from collections import defaultdict | |
from rich.logging import RichHandler | |
LIST_PATTERN_TARGET_PROCESS_DEFAULT: List[re.Pattern] = [ | |
re.compile(_pattern, re.IGNORECASE) | |
for _pattern in [ | |
r"jamf[a-z]*", # Jamf software | |
r"amp[a-z]*", # Cisco AMP | |
r"ac[nu][a-z]+", | |
r"ciscod", # Cisco Software | |
] | |
] | |
LIST_PATTERN_TARGET_USER_DEFAULT: List[re.Pattern] = [ | |
re.compile(_pattern, re.IGNORECASE) | |
for _pattern in [r"nobody"] # Nobody user, used by Jamf | |
] | |
TIME_HISTORICAL_SECONDS = dt.timedelta(minutes=5).seconds | |
@dataclass | |
class Assassin: | |
list_pattern_target_process: List[re.Pattern] = field( | |
default_factory=lambda: LIST_PATTERN_TARGET_PROCESS_DEFAULT | |
) | |
list_pattern_target_user: List[re.Pattern] = field(default_factory=lambda: []) | |
time_interval_check_seconds: float = 1 / 4 | |
time_interval_historical_seconds: float = TIME_HISTORICAL_SECONDS | |
logger: logging.Logger = field(default_factory=lambda: logging.getLogger(__name__)) | |
log_handler: logging.Handler = field( | |
default_factory=lambda: RichHandler( | |
show_time=False, show_path=False, markup=True, rich_tracebacks=True | |
) | |
) | |
history_process_killed_interval: Dict[str, Dict[str, int]] = field( | |
default_factory=lambda: defaultdict(defaultdict(int)) | |
) | |
history_process_killed_total: Dict[str, Dict[str, int]] = field( | |
default_factory=lambda: defaultdict(defaultdict(int)) | |
) | |
option_log_kill_immediately: bool = False | |
dt_historial_start: dt.datetime = dt.datetime.now() | |
def __post_init__(self): | |
self.logger.addHandler(self.log_handler) | |
self.log_handler.debug(f"Target processes: {self.list_pattern_target_process}") | |
@property | |
def total_killed(self) -> int: | |
"""Total number of processes killed.""" | |
return sum(self.history_process_killed.values()) | |
def confirm_kill(self, process: psutil.Process): | |
"""Add process info to tally.""" | |
if self.option_log_kill_immediately: | |
self.log_handler.info(f"Killed {process.name()} ({process.username()})") | |
for _dict in [ | |
self.history_process_killed_interval, | |
self.history_process_killed_total, | |
]: | |
process_name: str = process.name() | |
process_user: str = process.username() | |
_dict[process_user][process_name] += 1 | |
def reset_history_interval(self): | |
"""Reset the interval history.""" | |
self.history_process_killed_interval = fields(self)[ | |
"history_process_killed_interval" | |
].default_factory() | |
def kill_targets(self): | |
"""Kill all processes that match the target patterns.""" | |
while True: | |
for process in psutil.process_iter(attrs=None, ad_value=None): | |
try: | |
process_name: str = process.name() | |
process_user: str = process.username() | |
if any( | |
re.match(_pattern, process_name) | |
for _pattern in self.list_pattern_target_process | |
) or any( | |
re.match(_pattern, process_user) | |
for _pattern in self.list_pattern_target_user | |
): | |
process.kill() | |
self.confirm_kill(process) | |
except psutil.NoSuchProcess: | |
# Process ended, continue | |
continue | |
if dt.datetime.now() - self.dt_historial_start > dt.timedelta( | |
seconds=self.time_interval_historical_seconds | |
): | |
self.reset_history_interval() | |
self.dt_historial_start = dt.datetime.now() | |
self.log_handler.info( | |
f"Total killed in {self.time_interval_check_seconds}:\n" | |
f"{self.history_process_killed_interval}" | |
) | |
time.sleep(self.time_interval_check_seconds) | |
self.log_handler.info(f"Killed {self.total_killed :,} total processes.") | |
if __name__ == "__main__": | |
assassin = Assassin() | |
assassin.kill_targets() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
psutil | |
rich |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment