Skip to content

Instantly share code, notes, and snippets.

@claytonjroberts
Last active August 28, 2023 23:40
Show Gist options
  • Save claytonjroberts/f2898237065884da6ddebb1bffac66fe to your computer and use it in GitHub Desktop.
Save claytonjroberts/f2898237065884da6ddebb1bffac66fe to your computer and use it in GitHub Desktop.
IT Process Terminator
"""Code for continous termination of IT processes.
Designed for processes that are automatically uploaded to
a MacOS computer by means of serial number.
"""
import psutil
import time
import re
import datetime as dt
import logging
from typing import List, Dict
from dataclasses import dataclass, field, fields
from collections import defaultdict
from rich.logging import RichHandler
LIST_PATTERN_TARGET_PROCESS_DEFAULT: List[re.Pattern] = [
re.compile(_pattern, re.IGNORECASE)
for _pattern in [
r"jamf[a-z]*", # Jamf software
r"amp[a-z]*", # Cisco AMP
r"ac[nu][a-z]+",
r"ciscod", # Cisco Software
]
]
LIST_PATTERN_TARGET_USER_DEFAULT: List[re.Pattern] = [
re.compile(_pattern, re.IGNORECASE)
for _pattern in [r"nobody"] # Nobody user, used by Jamf
]
TIME_HISTORICAL_SECONDS = dt.timedelta(minutes=5).seconds
@dataclass
class Assassin:
list_pattern_target_process: List[re.Pattern] = field(
default_factory=lambda: LIST_PATTERN_TARGET_PROCESS_DEFAULT
)
list_pattern_target_user: List[re.Pattern] = field(default_factory=lambda: [])
time_interval_check_seconds: float = 1 / 4
time_interval_historical_seconds: float = TIME_HISTORICAL_SECONDS
logger: logging.Logger = field(default_factory=lambda: logging.getLogger(__name__))
log_handler: logging.Handler = field(
default_factory=lambda: RichHandler(
show_time=False, show_path=False, markup=True, rich_tracebacks=True
)
)
history_process_killed_interval: Dict[str, Dict[str, int]] = field(
default_factory=lambda: defaultdict(defaultdict(int))
)
history_process_killed_total: Dict[str, Dict[str, int]] = field(
default_factory=lambda: defaultdict(defaultdict(int))
)
option_log_kill_immediately: bool = False
dt_historial_start: dt.datetime = dt.datetime.now()
def __post_init__(self):
self.logger.addHandler(self.log_handler)
self.log_handler.debug(f"Target processes: {self.list_pattern_target_process}")
@property
def total_killed(self) -> int:
"""Total number of processes killed."""
return sum(self.history_process_killed.values())
def confirm_kill(self, process: psutil.Process):
"""Add process info to tally."""
if self.option_log_kill_immediately:
self.log_handler.info(f"Killed {process.name()} ({process.username()})")
for _dict in [
self.history_process_killed_interval,
self.history_process_killed_total,
]:
process_name: str = process.name()
process_user: str = process.username()
_dict[process_user][process_name] += 1
def reset_history_interval(self):
"""Reset the interval history."""
self.history_process_killed_interval = fields(self)[
"history_process_killed_interval"
].default_factory()
def kill_targets(self):
"""Kill all processes that match the target patterns."""
while True:
for process in psutil.process_iter(attrs=None, ad_value=None):
try:
process_name: str = process.name()
process_user: str = process.username()
if any(
re.match(_pattern, process_name)
for _pattern in self.list_pattern_target_process
) or any(
re.match(_pattern, process_user)
for _pattern in self.list_pattern_target_user
):
process.kill()
self.confirm_kill(process)
except psutil.NoSuchProcess:
# Process ended, continue
continue
if dt.datetime.now() - self.dt_historial_start > dt.timedelta(
seconds=self.time_interval_historical_seconds
):
self.reset_history_interval()
self.dt_historial_start = dt.datetime.now()
self.log_handler.info(
f"Total killed in {self.time_interval_check_seconds}:\n"
f"{self.history_process_killed_interval}"
)
time.sleep(self.time_interval_check_seconds)
self.log_handler.info(f"Killed {self.total_killed :,} total processes.")
if __name__ == "__main__":
assassin = Assassin()
assassin.kill_targets()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment