Skip to content

Instantly share code, notes, and snippets.

@awesomebytes
Created April 23, 2019 05:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save awesomebytes/1f22ee481c71841ad00337a1f15bc9bc to your computer and use it in GitHub Desktop.
Save awesomebytes/1f22ee481c71841ad00337a1f15bc9bc to your computer and use it in GitHub Desktop.
Monitor the CPU of a process given the process name
#!/tmp/gentoo/usr/bin/env python
import subprocess
import tempfile
import os
import sys
import signal
import time
import re
import rospy
import numpy as np
import csv
class ShellCmd:
"""Helpful class to spawn commands and keep track of them"""
def __init__(self, cmd):
self.retcode = None
self.outf = tempfile.NamedTemporaryFile(mode="w")
self.errf = tempfile.NamedTemporaryFile(mode="w")
self.inf = tempfile.NamedTemporaryFile(mode="r")
self.process = subprocess.Popen(cmd, shell=True, stdin=self.inf,
stdout=self.outf, stderr=self.errf,
preexec_fn=os.setsid, close_fds=True)
# print("ShellCmd: Executing command:")
# print(cmd)
def __del__(self):
if not self.is_done():
self.kill()
self.outf.close()
self.errf.close()
self.inf.close()
def get_stdout(self):
with open(self.outf.name, "r") as f:
return f.read()
def get_stderr(self):
with open(self.errf.name, "r") as f:
return f.read()
def get_retcode(self):
"""Get retcode or None if still running"""
if self.retcode is None:
self.retcode = self.process.poll()
return self.retcode
def is_done(self):
return self.get_retcode() is not None
def is_succeeded(self):
"""Check if the process ended with success state (retcode 0)
If the process hasn't finished yet this will be False."""
return self.get_retcode() == 0
def kill(self, sig=signal.SIGINT):
self.retcode = -1
os.killpg(self.process.pid, sig)
self.process.wait()
def kill_forced(self):
self.kill(sig=signal.SIGTERM)
def get_average_from_top_strs(top_strs):
cpupcts = []
mems = []
for top_str in top_strs:
cpupct, mem = get_result_from_top_str(top_str)
cpupcts.append(cpupct)
mems.append(mem)
cpupct_avg = np.average(cpupcts)
mem_avg = np.average(mems)
return cpupct_avg, mem_avg, mem_avg / 1000.0, mem_avg / 1000.0 / 1000.0
def get_result_from_top_str(top_str):
# ' 8447 sam 20 0 746148 36800 26968 S 40.0 0.2 0:01.55 republish'
elements = [elem.strip() for elem in top_str.split()]
pid, user, pr, nice, virtual_mem, ram, shared, sthing, cpupct, mempct, time, command = elements
return float(cpupct), float(ram)
def get_top_stats(process_name, top_capture_time=30.0, top_capture_hz=4):
# Capture top a few times
top_captures = []
ini_t = time.time()
while not time.time() - ini_t > top_capture_time:
# print(" Capturing top")
# We run top -p PID -bn 1
# On the program name we are given
# taking out of the results of ps aux the grep process itself
# and this own script (as it will contain the process name as argument)
cmd = "top -p `ps aux | grep " + process_name + " | grep -v grep | grep -v '" + sys.argv[0] + " " + process_name + "' | awk '{print $2}'` -bn 1"
topcmd = ShellCmd(
cmd)
# # debug
# print("part1:")
# part1 = ShellCmd("ps aux | grep " + process_name)
# time.sleep(0.2)
# print part1.get_stdout()
# print("part2:")
# part2 = ShellCmd("ps aux | grep " + process_name + " | grep -v grep")
# time.sleep(0.2)
# print part2.get_stdout()
# print("part3:")
# part3 = ShellCmd("ps aux | grep " + process_name + " | grep -v grep | grep -v '" + sys.argv[0] + " " + process_name + "'")
# time.sleep(0.2)
# print part3.get_stdout()
# print("Running command:\n" + cmd)
while not topcmd.is_done():
time.sleep(0.01)
if topcmd.get_stderr():
print("stderr: " + str(topcmd.get_stderr()))
stdoutstuff = topcmd.get_stdout()
linesstdout = stdoutstuff.split('\n')
# Get the last line of stdout that has the interesting stuff
top_captures.append(linesstdout[-2])
# print("\n\nGot top captures:")
# print(top_captures)
avg_cpu, avg_mem_b, avg_mem_kb, avg_mem_mb = get_average_from_top_strs(
top_captures)
print("Process " + str(process_name))
print("Average CPU: " + str(round(avg_cpu, 2)) + " %")
print("Average memory: " + str(round(avg_mem_mb, 2)) + " MB")
return avg_cpu, avg_mem_b, avg_mem_kb, avg_mem_mb
if __name__ == '__main__':
import sys
if len(sys.argv) < 2:
print("Usage:")
print(sys.argv[0] + " process_name [time_to_capture] [hz_to_capture]")
print(" (default 30.0s) (default 4hz)")
exit(0)
process_name = sys.argv[1]
get_top_stats(process_name)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment