Last active
April 9, 2022 23:57
-
-
Save hhsprings/d737de4c393a6fba282935d574112a47 to your computer and use it in GitHub Desktop.
A simple example of Python's "psutil" package (with features such as kill, pause, resume)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
from __future__ import print_function | |
from __future__ import unicode_literals | |
import os | |
import sys | |
import pipes | |
import re | |
import psutil # pip install psutil | |
_spcodes = { | |
"reset": "\x1b[39;49;00m", | |
"pwd": "\x1b[44m", | |
"argv0": "\x1b[102m;\x1b[30m", | |
} | |
if sys.platform == "win32": | |
try: | |
import colorama | |
colorama.init() | |
except ImportError as e: | |
_spcodes = {k: "" for k, _ in _spcodes.items()} | |
def _str(s): | |
return s.encode(sys.getfilesystemencoding(), errors="xmlcharrefreplace").decode( | |
sys.getfilesystemencoding()) | |
try: | |
_prio_nmap = { | |
"1": ("ABOVE_NORMAL_PRIORITY_CLASS", psutil.ABOVE_NORMAL_PRIORITY_CLASS), | |
"2": ("BELOW_NORMAL_PRIORITY_CLASS", psutil.BELOW_NORMAL_PRIORITY_CLASS), | |
"3": ("HIGH_PRIORITY_CLASS", psutil.HIGH_PRIORITY_CLASS), | |
"4": ("IDLE_PRIORITY_CLASS", psutil.IDLE_PRIORITY_CLASS), | |
"5": ("NORMAL_PRIORITY_CLASS", psutil.NORMAL_PRIORITY_CLASS), | |
"6": ("REALTIME_PRIORITY_CLASS", psutil.REALTIME_PRIORITY_CLASS), | |
} | |
except AttributeError: # maybe, not Windows | |
_prio_nmap = None | |
if __name__ == '__main__': | |
import argparse | |
ap = argparse.ArgumentParser() | |
ap.add_argument("--iter_include_self", action="store_true") | |
ap.add_argument("--iter_include_parent", action="store_true") | |
ap_cmdgrp = ap.add_mutually_exclusive_group() | |
ap_cmdgrp.add_argument("--nop", action="store_true") | |
ap_cmdgrp.add_argument( | |
"--action", choices=["suspend", "resume", "kill", "priority", "nop"]) | |
ap.add_argument("--search_cmdline_regexp", default=".*") | |
ap.add_argument("--search_cmd_regexp", default=".*") | |
ap.add_argument("--extra_cond", default="True", help="for example: 'p.pid == 45678'") | |
ap.add_argument("--cmdbasename", action="store_true") | |
ap.add_argument("--cwdbasename", action="store_true") | |
ap.add_argument("-q", "--quiet", action="store_true", help="suppress warning") | |
ap.add_argument("-y", "--yes", action="store_true") | |
ap.add_argument("--line_format", default="{pid} [{pwd}] {cmdl}") | |
args = ap.parse_args() | |
if args.nop: | |
action = "nop" | |
else: | |
action = args.action | |
def _catcmdl(cmdl): | |
cl = [pipes.quote(a) for a in cmdl] | |
cl[0] = "{}{}{}".format( | |
_spcodes["argv0"], cl[0], _spcodes["reset"]) | |
return " ".join(cl) | |
ma = { | |
"cmd": re.compile(args.search_cmd_regexp), | |
"cmdline": re.compile(args.search_cmdline_regexp), | |
} | |
def _warn(*a, **kw): | |
if args.quiet: | |
return | |
print(*a, **kw, file=sys.stderr) | |
myproc = psutil.Process(os.getpid()) | |
for p in psutil.process_iter(): | |
if (not args.iter_include_self and p.pid == myproc.pid) or ( | |
not args.iter_include_parent and p.pid == myproc.ppid()): | |
continue | |
cmdl = [] | |
try: | |
cmdl = p.cmdline() | |
except Exception as e: | |
_warn("cannot get commandline:", e) | |
if not cmdl: | |
continue | |
if args.cmdbasename: | |
cmdl[0] = os.path.basename(cmdl[0]) | |
tcmdl = _catcmdl(cmdl) | |
if eval(args.extra_cond) and \ | |
ma["cmd"].search(cmdl[0]) and \ | |
ma["cmdline"].search(tcmdl): | |
# | |
pwd = "" | |
try: | |
pwd = p.cwd() | |
if pwd and args.cwdbasename: | |
pwd = os.path.basename(pwd) | |
pwd = "{}{}{}".format( | |
_spcodes["pwd"], pwd, _spcodes["reset"]) | |
except Exception as e: | |
_warn("cannot get working directory:", e) | |
# | |
print(args.line_format.format( | |
pid=p.pid, pwd=_str(pwd), cmdl=_str(tcmdl)), end=" ") | |
if action == "nop": | |
print("") | |
continue | |
elif not action: | |
q = input(": Suspend/Resume/Kill/Priority/Nop? (s/r/k/p/n) ") | |
else: | |
q = action[0] | |
if q.lower() == "p": | |
orig = p.nice() | |
print("current proirity is", orig) | |
if _prio_nmap: | |
q = input("{}? ".format(" / ".join( | |
["{}: {}".format(n, _prio_nmap[n][0]) | |
for n in sorted(_prio_nmap.keys())]))) | |
if q in _prio_nmap: | |
p.nice(_prio_nmap[q][1]) | |
print(orig, "=>", p.nice()) | |
else: | |
q = input("new value? ") | |
try: | |
p.nice(int(q)) | |
print(orig, "=>", p.nice()) | |
except ValueError: | |
pass | |
elif q.lower() == "s": | |
yn = "y" if args.yes else input( | |
"Are you sure to suspend this process? (y/n) ").lower() | |
if yn == "y": | |
p.suspend() | |
#break | |
elif q.lower() == "r": | |
yn = "y" if args.yes else input( | |
"Are you sure to resume this process? (y/n) ").lower() | |
if yn == "y": | |
p.resume() | |
#break | |
elif q.lower() == "k": | |
yn = "y" if args.yes else input( | |
"Are you sure to kill this process? (y/n) ").lower() | |
if yn == "y": | |
p.kill() | |
#break |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment