Skip to content

Instantly share code, notes, and snippets.

@hhsprings
Last active April 9, 2022 23:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hhsprings/d737de4c393a6fba282935d574112a47 to your computer and use it in GitHub Desktop.
Save hhsprings/d737de4c393a6fba282935d574112a47 to your computer and use it in GitHub Desktop.
A simple example of Python's "psutil" package (with features such as kill, pause, resume)
# -*- coding: utf-8 -*-
from __future__ import print_function
from __future__ import unicode_literals
import os
import sys
import pipes
import re
import psutil # pip install psutil
_spcodes = {
"reset": "\x1b[39;49;00m",
"pwd": "\x1b[44m",
"argv0": "\x1b[102m;\x1b[30m",
}
if sys.platform == "win32":
try:
import colorama
colorama.init()
except ImportError as e:
_spcodes = {k: "" for k, _ in _spcodes.items()}
def _str(s):
return s.encode(sys.getfilesystemencoding(), errors="xmlcharrefreplace").decode(
sys.getfilesystemencoding())
try:
_prio_nmap = {
"1": ("ABOVE_NORMAL_PRIORITY_CLASS", psutil.ABOVE_NORMAL_PRIORITY_CLASS),
"2": ("BELOW_NORMAL_PRIORITY_CLASS", psutil.BELOW_NORMAL_PRIORITY_CLASS),
"3": ("HIGH_PRIORITY_CLASS", psutil.HIGH_PRIORITY_CLASS),
"4": ("IDLE_PRIORITY_CLASS", psutil.IDLE_PRIORITY_CLASS),
"5": ("NORMAL_PRIORITY_CLASS", psutil.NORMAL_PRIORITY_CLASS),
"6": ("REALTIME_PRIORITY_CLASS", psutil.REALTIME_PRIORITY_CLASS),
}
except AttributeError: # maybe, not Windows
_prio_nmap = None
if __name__ == '__main__':
import argparse
ap = argparse.ArgumentParser()
ap.add_argument("--iter_include_self", action="store_true")
ap.add_argument("--iter_include_parent", action="store_true")
ap_cmdgrp = ap.add_mutually_exclusive_group()
ap_cmdgrp.add_argument("--nop", action="store_true")
ap_cmdgrp.add_argument(
"--action", choices=["suspend", "resume", "kill", "priority", "nop"])
ap.add_argument("--search_cmdline_regexp", default=".*")
ap.add_argument("--search_cmd_regexp", default=".*")
ap.add_argument("--extra_cond", default="True", help="for example: 'p.pid == 45678'")
ap.add_argument("--cmdbasename", action="store_true")
ap.add_argument("--cwdbasename", action="store_true")
ap.add_argument("-q", "--quiet", action="store_true", help="suppress warning")
ap.add_argument("-y", "--yes", action="store_true")
ap.add_argument("--line_format", default="{pid} [{pwd}] {cmdl}")
args = ap.parse_args()
if args.nop:
action = "nop"
else:
action = args.action
def _catcmdl(cmdl):
cl = [pipes.quote(a) for a in cmdl]
cl[0] = "{}{}{}".format(
_spcodes["argv0"], cl[0], _spcodes["reset"])
return " ".join(cl)
ma = {
"cmd": re.compile(args.search_cmd_regexp),
"cmdline": re.compile(args.search_cmdline_regexp),
}
def _warn(*a, **kw):
if args.quiet:
return
print(*a, **kw, file=sys.stderr)
myproc = psutil.Process(os.getpid())
for p in psutil.process_iter():
if (not args.iter_include_self and p.pid == myproc.pid) or (
not args.iter_include_parent and p.pid == myproc.ppid()):
continue
cmdl = []
try:
cmdl = p.cmdline()
except Exception as e:
_warn("cannot get commandline:", e)
if not cmdl:
continue
if args.cmdbasename:
cmdl[0] = os.path.basename(cmdl[0])
tcmdl = _catcmdl(cmdl)
if eval(args.extra_cond) and \
ma["cmd"].search(cmdl[0]) and \
ma["cmdline"].search(tcmdl):
#
pwd = ""
try:
pwd = p.cwd()
if pwd and args.cwdbasename:
pwd = os.path.basename(pwd)
pwd = "{}{}{}".format(
_spcodes["pwd"], pwd, _spcodes["reset"])
except Exception as e:
_warn("cannot get working directory:", e)
#
print(args.line_format.format(
pid=p.pid, pwd=_str(pwd), cmdl=_str(tcmdl)), end=" ")
if action == "nop":
print("")
continue
elif not action:
q = input(": Suspend/Resume/Kill/Priority/Nop? (s/r/k/p/n) ")
else:
q = action[0]
if q.lower() == "p":
orig = p.nice()
print("current proirity is", orig)
if _prio_nmap:
q = input("{}? ".format(" / ".join(
["{}: {}".format(n, _prio_nmap[n][0])
for n in sorted(_prio_nmap.keys())])))
if q in _prio_nmap:
p.nice(_prio_nmap[q][1])
print(orig, "=>", p.nice())
else:
q = input("new value? ")
try:
p.nice(int(q))
print(orig, "=>", p.nice())
except ValueError:
pass
elif q.lower() == "s":
yn = "y" if args.yes else input(
"Are you sure to suspend this process? (y/n) ").lower()
if yn == "y":
p.suspend()
#break
elif q.lower() == "r":
yn = "y" if args.yes else input(
"Are you sure to resume this process? (y/n) ").lower()
if yn == "y":
p.resume()
#break
elif q.lower() == "k":
yn = "y" if args.yes else input(
"Are you sure to kill this process? (y/n) ").lower()
if yn == "y":
p.kill()
#break
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment