Skip to content

Instantly share code, notes, and snippets.

@Hzzone
Last active November 9, 2019 02:32
Show Gist options
  • Save Hzzone/ae395ee1fbedd1faf3c78b30b4a759ce to your computer and use it in GitHub Desktop.
Save Hzzone/ae395ee1fbedd1faf3c78b30b4a759ce to your computer and use it in GitHub Desktop.
A basic gpu query util for python
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Implementation of gpustat
@author Jongwook Choi
@url https://github.com/wookayin/gpustat
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import json
import locale
import os.path
import platform
import sys
import time
from datetime import datetime
from six.moves import cStringIO as StringIO
import psutil
import pynvml as N
from blessings import Terminal
import os.path
'''
six>=1.7
nvidia-ml-py3
psutil
blessings>=1.6
gpustat.new_query()[0].entry
'''
def bytes2human(in_bytes):
'''
'''
suffixes = ('B', 'KB', 'MB', 'GB', 'TB', 'PB')
suffix = 0
result = int(in_bytes)
while result > 9999 and suffix < len(suffixes):
result = result >> 10
suffix += 1
if suffix >= len(suffixes):
suffix -= 1
return "%d%s" % (result, suffixes[suffix])
def prettify_commandline(cmdline, color_command='', color_text=''):
'''
Prettify and colorlize a full command-line (given as list of strings),
where command (basename) is highlighted in a different color.
'''
# cmdline: Iterable[str]
if isinstance(cmdline, str):
return cmdline
assert cmdline
command_p, command_b = os.path.split(cmdline[0])
s = color_text + os.path.join(command_p, color_command + command_b + color_text)
if len(cmdline) > 1:
s += ' '
s += ' '.join(cmdline[1:])
return s
NOT_SUPPORTED = 'Not Supported'
MB = 1024 * 1024
class GPUStat(object):
def __init__(self, entry):
if not isinstance(entry, dict):
raise TypeError(
'entry should be a dict, {} given'.format(type(entry))
)
self.entry = entry
def __repr__(self):
return self.print_to(StringIO()).getvalue()
def keys(self):
return self.entry.keys()
def __getitem__(self, key):
return self.entry[key]
@property
def index(self):
"""
Returns the index of GPU (as in nvidia-smi).
"""
return self.entry['index']
@property
def uuid(self):
"""
Returns the uuid returned by nvidia-smi,
e.g. GPU-12345678-abcd-abcd-uuid-123456abcdef
"""
return self.entry['uuid']
@property
def name(self):
"""
Returns the name of GPU card (e.g. Geforce Titan X)
"""
return self.entry['name']
@property
def memory_total(self):
"""
Returns the total memory (in MB) as an integer.
"""
return int(self.entry['memory.total'])
@property
def memory_used(self):
"""
Returns the occupied memory (in MB) as an integer.
"""
return int(self.entry['memory.used'])
@property
def memory_free(self):
"""
Returns the free (available) memory (in MB) as an integer.
"""
v = self.memory_total - self.memory_used
return max(v, 0)
@property
def memory_available(self):
"""
Returns the available memory (in MB) as an integer.
Alias of memory_free.
"""
return self.memory_free
@property
def temperature(self):
"""
Returns the temperature (in celcius) of GPU as an integer,
or None if the information is not available.
"""
v = self.entry['temperature.gpu']
return int(v) if v is not None else None
@property
def fan_speed(self):
"""
Returns the fan speed percentage (0-100) of maximum intended speed
as an integer, or None if the information is not available.
"""
v = self.entry['fan.speed']
return int(v) if v is not None else None
@property
def utilization(self):
"""
Returns the GPU utilization (in percentile),
or None if the information is not available.
"""
v = self.entry['utilization.gpu']
return int(v) if v is not None else None
@property
def power_draw(self):
"""
Returns the GPU power usage in Watts,
or None if the information is not available.
"""
v = self.entry['power.draw']
return int(v) if v is not None else None
@property
def power_limit(self):
"""
Returns the (enforced) GPU power limit in Watts,
or None if the information is not available.
"""
v = self.entry['enforced.power.limit']
return int(v) if v is not None else None
@property
def processes(self):
"""
Get the list of running processes on the GPU.
"""
return self.entry['processes']
def print_to(self, fp,
with_colors=True, # deprecated arg
show_cmd=False,
show_full_cmd=False,
show_user=False,
show_pid=False,
show_power=None,
show_fan_speed=None,
gpuname_width=16,
term=Terminal(),
):
# color settings
colors = {}
def _conditional(cond_fn, true_value, false_value,
error_value=term.bold_black):
try:
return cond_fn() and true_value or false_value
except Exception:
return error_value
colors['C0'] = term.normal
colors['C1'] = term.cyan
colors['CName'] = term.blue
colors['CTemp'] = _conditional(lambda: self.temperature < 50,
term.red, term.bold_red)
colors['FSpeed'] = _conditional(lambda: self.fan_speed < 30,
term.cyan, term.bold_cyan)
colors['CMemU'] = term.bold_yellow
colors['CMemT'] = term.yellow
colors['CMemP'] = term.yellow
colors['CCPUMemU'] = term.yellow
colors['CUser'] = term.bold_black # gray
colors['CUtil'] = _conditional(lambda: self.utilization < 30,
term.green, term.bold_green)
colors['CCPUUtil'] = term.green
colors['CPowU'] = _conditional(
lambda: float(self.power_draw) / self.power_limit < 0.4,
term.magenta, term.bold_magenta
)
colors['CPowL'] = term.magenta
colors['CCmd'] = term.color(24) # a bit dark
if not with_colors:
for k in list(colors.keys()):
colors[k] = ''
def _repr(v, none_value='??'):
return none_value if v is None else v
# build one-line display information
# we want power use optional, but if deserves being grouped with
# temperature and utilization
reps = u"%(C1)s[{entry[index]}]%(C0)s " \
"%(CName)s{entry[name]:{gpuname_width}}%(C0)s |" \
"%(CTemp)s{entry[temperature.gpu]:>3}°C%(C0)s, "
if show_fan_speed:
reps += "%(FSpeed)s{entry[fan.speed]:>3} %%%(C0)s, "
reps += "%(CUtil)s{entry[utilization.gpu]:>3} %%%(C0)s"
if show_power:
reps += ", %(CPowU)s{entry[power.draw]:>3}%(C0)s "
if show_power is True or 'limit' in show_power:
reps += "/ %(CPowL)s{entry[enforced.power.limit]:>3}%(C0)s "
reps += "%(CPowL)sW%(C0)s"
else:
reps += "%(CPowU)sW%(C0)s"
reps += " | %(C1)s%(CMemU)s{entry[memory.used]:>5}%(C0)s " \
"/ %(CMemT)s{entry[memory.total]:>5}%(C0)s MB"
reps = (reps) % colors
reps = reps.format(entry={k: _repr(v) for k, v in self.entry.items()},
gpuname_width=gpuname_width)
reps += " |"
def process_repr(p):
r = ''
if not show_cmd or show_user:
r += "{CUser}{}{C0}".format(
_repr(p['username'], '--'), **colors
)
if show_cmd:
if r:
r += ':'
r += "{C1}{}{C0}".format(
_repr(p.get('command', p['pid']), '--'), **colors
)
if show_pid:
r += ("/%s" % _repr(p['pid'], '--'))
r += '({CMemP}{}M{C0})'.format(
_repr(p['gpu_memory_usage'], '?'), **colors
)
return r
def full_process_info(p):
r = "{C0} ├─ {:>6} ".format(
_repr(p['pid'], '--'), **colors
)
r += "{C0}({CCPUUtil}{:4.0f}%{C0}, {CCPUMemU}{:>6}{C0})".format(
_repr(p['cpu_percent'], '--'),
bytes2human(_repr(p['cpu_memory_usage'], 0)), **colors
)
full_command_pretty = prettify_commandline(
p['full_command'], colors['C1'], colors['CCmd'])
r += "{C0}: {CCmd}{}{C0}".format(
_repr(full_command_pretty, '?'),
**colors
)
return r
processes = self.entry['processes']
full_processes = []
if processes is None:
# None (not available)
reps += ' ({})'.format(NOT_SUPPORTED)
else:
for p in processes:
reps += ' ' + process_repr(p)
if show_full_cmd:
full_processes.append('\n' + full_process_info(p))
if show_full_cmd and full_processes:
full_processes[-1] = full_processes[-1].replace('├', '└', 1)
reps += ''.join(full_processes)
fp.write(reps)
return fp
def jsonify(self):
o = dict(self.entry)
if self.entry['processes'] is not None:
o['processes'] = [{k: v for (k, v) in p.items() if k != 'gpu_uuid'}
for p in self.entry['processes']]
else:
o['processes'] = '({})'.format(NOT_SUPPORTED)
return o
class GPUStatCollection(object):
global_processes = {}
def __init__(self, gpu_list, driver_version=None):
self.gpus = gpu_list
# attach additional system information
self.hostname = platform.node()
self.query_time = datetime.now()
self.driver_version = driver_version
@staticmethod
def clean_processes():
for pid in list(GPUStatCollection.global_processes.keys()):
if not psutil.pid_exists(pid):
del GPUStatCollection.global_processes[pid]
@staticmethod
def new_query():
"""Query the information of all the GPUs on local machine"""
N.nvmlInit()
def _decode(b):
if isinstance(b, bytes):
return b.decode() # for python3, to unicode
return b
def get_gpu_info(handle):
"""Get one GPU information specified by nvml handle"""
def get_process_info(nv_process):
"""Get the process information of specific pid"""
process = {}
if nv_process.pid not in GPUStatCollection.global_processes:
GPUStatCollection.global_processes[nv_process.pid] = \
psutil.Process(pid=nv_process.pid)
ps_process = GPUStatCollection.global_processes[nv_process.pid]
process['username'] = ps_process.username()
# cmdline returns full path;
# as in `ps -o comm`, get short cmdnames.
_cmdline = ps_process.cmdline()
if not _cmdline:
# sometimes, zombie or unknown (e.g. [kworker/8:2H])
process['command'] = '?'
process['full_command'] = ['?']
else:
process['command'] = os.path.basename(_cmdline[0])
process['full_command'] = _cmdline
# Bytes to MBytes
process['gpu_memory_usage'] = nv_process.usedGpuMemory // MB
process['cpu_percent'] = ps_process.cpu_percent()
process['cpu_memory_usage'] = \
round((ps_process.memory_percent() / 100.0) *
psutil.virtual_memory().total)
process['pid'] = nv_process.pid
return process
name = _decode(N.nvmlDeviceGetName(handle))
uuid = _decode(N.nvmlDeviceGetUUID(handle))
try:
temperature = N.nvmlDeviceGetTemperature(
handle, N.NVML_TEMPERATURE_GPU
)
except N.NVMLError:
temperature = None # Not supported
try:
fan_speed = N.nvmlDeviceGetFanSpeed(handle)
except N.NVMLError:
fan_speed = None # Not supported
try:
memory = N.nvmlDeviceGetMemoryInfo(handle) # in Bytes
except N.NVMLError:
memory = None # Not supported
try:
utilization = N.nvmlDeviceGetUtilizationRates(handle)
except N.NVMLError:
utilization = None # Not supported
try:
power = N.nvmlDeviceGetPowerUsage(handle)
except N.NVMLError:
power = None
try:
power_limit = N.nvmlDeviceGetEnforcedPowerLimit(handle)
except N.NVMLError:
power_limit = None
try:
nv_comp_processes = \
N.nvmlDeviceGetComputeRunningProcesses(handle)
except N.NVMLError:
nv_comp_processes = None # Not supported
try:
nv_graphics_processes = \
N.nvmlDeviceGetGraphicsRunningProcesses(handle)
except N.NVMLError:
nv_graphics_processes = None # Not supported
if nv_comp_processes is None and nv_graphics_processes is None:
processes = None
else:
processes = []
nv_comp_processes = nv_comp_processes or []
nv_graphics_processes = nv_graphics_processes or []
for nv_process in nv_comp_processes + nv_graphics_processes:
try:
process = get_process_info(nv_process)
processes.append(process)
except psutil.NoSuchProcess:
# TODO: add some reminder for NVML broken context
# e.g. nvidia-smi reset or reboot the system
pass
# TODO: Do not block if full process info is not requested
time.sleep(0.1)
for process in processes:
pid = process['pid']
cache_process = GPUStatCollection.global_processes[pid]
process['cpu_percent'] = cache_process.cpu_percent()
index = N.nvmlDeviceGetIndex(handle)
gpu_info = {
'index': index,
'uuid': uuid,
'name': name,
'temperature.gpu': temperature,
'fan.speed': fan_speed,
'utilization.gpu': utilization.gpu if utilization else None,
'power.draw': power // 1000 if power is not None else None,
'enforced.power.limit': power_limit // 1000
if power_limit is not None else None,
# Convert bytes into MBytes
'memory.used': memory.used // MB if memory else None,
'memory.total': memory.total // MB if memory else None,
'processes': processes,
}
GPUStatCollection.clean_processes()
return gpu_info
# 1. get the list of gpu and status
gpu_list = []
device_count = N.nvmlDeviceGetCount()
for index in range(device_count):
handle = N.nvmlDeviceGetHandleByIndex(index)
gpu_info = get_gpu_info(handle)
gpu_stat = GPUStat(gpu_info)
gpu_list.append(gpu_stat)
# 2. additional info (driver version, etc).
try:
driver_version = _decode(N.nvmlSystemGetDriverVersion())
except N.NVMLError:
driver_version = None # N/A
N.nvmlShutdown()
return GPUStatCollection(gpu_list, driver_version=driver_version)
def __len__(self):
return len(self.gpus)
def __iter__(self):
return iter(self.gpus)
def __getitem__(self, index):
return self.gpus[index]
def __repr__(self):
s = 'GPUStatCollection(host=%s, [\n' % self.hostname
s += '\n'.join(' ' + str(g) for g in self.gpus)
s += '\n])'
return s
# --- Printing Functions ---
def print_formatted(self, fp=sys.stdout, force_color=False, no_color=False,
show_cmd=False, show_full_cmd=False, show_user=False,
show_pid=False, show_power=None, show_fan_speed=None,
gpuname_width=16, show_header=True,
eol_char=os.linesep,
):
# ANSI color configuration
if force_color and no_color:
raise ValueError("--color and --no_color can't"
" be used at the same time")
if force_color:
t_color = Terminal(kind='linux', force_styling=True)
# workaround of issue #32 (watch doesn't recognize sgr0 characters)
t_color.normal = u'\x1b[0;10m'
elif no_color:
t_color = Terminal(force_styling=None)
else:
t_color = Terminal() # auto, depending on isatty
# appearance settings
entry_name_width = [len(g.entry['name']) for g in self]
gpuname_width = max([gpuname_width or 0] + entry_name_width)
# header
if show_header:
time_format = locale.nl_langinfo(locale.D_T_FMT)
header_template = '{t.bold_white}{hostname:{width}}{t.normal} '
header_template += '{timestr} '
header_template += '{t.bold_black}{driver_version}{t.normal}'
header_msg = header_template.format(
hostname=self.hostname,
width=gpuname_width + 3, # len("[?]")
timestr=self.query_time.strftime(time_format),
driver_version=self.driver_version,
t=t_color,
)
fp.write(header_msg.strip())
fp.write(eol_char)
# body
for g in self:
g.print_to(fp,
show_cmd=show_cmd,
show_full_cmd=show_full_cmd,
show_user=show_user,
show_pid=show_pid,
show_power=show_power,
show_fan_speed=show_fan_speed,
gpuname_width=gpuname_width,
term=t_color)
fp.write(eol_char)
fp.flush()
def jsonify(self):
return {
'hostname': self.hostname,
'query_time': self.query_time,
"gpus": [g.jsonify() for g in self]
}
def print_json(self, fp=sys.stdout):
def date_handler(obj):
if hasattr(obj, 'isoformat'):
return obj.isoformat()
else:
raise TypeError(type(obj))
o = self.jsonify()
json.dump(o, fp, indent=4, separators=(',', ': '),
default=date_handler)
fp.write('\n')
fp.flush()
def new_query():
'''
Obtain a new GPUStatCollection instance by querying nvidia-smi
to get the list of GPUs and running process information.
'''
return GPUStatCollection.new_query()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment