Skip to content

Instantly share code, notes, and snippets.

@ab
Created January 13, 2021 22:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ab/ea29332ae82a70cbd0c88967ca46ae84 to your computer and use it in GitHub Desktop.
Save ab/ea29332ae82a70cbd0c88967ca46ae84 to your computer and use it in GitHub Desktop.
Python script that prints pstree and loops forever, gracefully exiting on SIGINT/SIGTERM, useful for debugging process managers / deploy behavior
#!/usr/bin/env python3
import collections
import logging
import os
import psutil
import signal
import sys
import time
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(sys.stderr)
handler.setLevel(logging.DEBUG)
sformat = "%(asctime)s [%(process)s] - %(name)s - %(levelname)s - %(message)s"
formatter = logging.Formatter(sformat)
handler.setFormatter(formatter)
logger.addHandler(handler)
def print_tree(parent, tree, indent=''):
try:
name = psutil.Process(parent).name()
except psutil.Error:
name = "?"
print(parent, name)
if parent not in tree:
return
# ignore kernel threads
if name == "kthreadd":
return
children = tree[parent][:-1]
for child in children:
sys.stdout.write(indent + "|- ")
print_tree(child, tree, indent + "| ")
child = tree[parent][-1]
sys.stdout.write(indent + "`_ ")
print_tree(child, tree, indent + " ")
def pstree_main():
# construct a dict where 'values' are all the processes
# having 'key' as their parent
tree = collections.defaultdict(list)
for p in psutil.process_iter():
try:
tree[p.ppid()].append(p.pid)
except (psutil.NoSuchProcess, psutil.ZombieProcess):
pass
# on systems supporting PID 0, PID 0's parent is usually 0
if 0 in tree and 0 in tree[0]:
tree[0].remove(0)
print_tree(min(tree), tree)
class Runner(object):
def __init__(self):
self.exit = False
signal.signal(signal.SIGTERM, self.handle_signal)
signal.signal(signal.SIGINT, self.handle_signal)
logger.info(f"{__file__} startup, pid={os.getpid()}")
pstree_main()
def main(self):
logger.info("Main loop")
while not self.exit:
logger.info("mark")
time.sleep(5)
logger.info("Exiting gracefully")
def handle_signal(self, num, frame):
logger.warning(f"Received signal {num} {signal.strsignal(num)}")
self.exit = True
def log(self, message):
print(os.getpid(), message)
if __name__ == "__main__":
Runner().main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment