Skip to content

Instantly share code, notes, and snippets.

@AndreLouisCaron
Created June 19, 2018 18:00
Show Gist options
  • Save AndreLouisCaron/adb8b5e60af88f90c6d9c77b7bb53872 to your computer and use it in GitHub Desktop.
Save AndreLouisCaron/adb8b5e60af88f90c6d9c77b7bb53872 to your computer and use it in GitHub Desktop.
CPU attribution for Python server applications on Windows
# -*- coding: utf-8 -*-
import logging
import psutil
import threading
import time
from contextlib import contextmanager
from functools import partial
@contextmanager
def trap_errors(exc_class=Exception, logger=None):
"""Log and ignore exceptions."""
logger = logger or logging.getLogger()
try:
yield
except exc_class:
logger.exception('Uncaught exception!')
def poll_periodically(stop_condition, poll_interval, fn):
"""Execute `fn` at periodic intervals (in the foreground)."""
while not stop_condition.wait(poll_interval):
with trap_errors():
fn()
@contextmanager
def poll_in_background(fn, poll_interval, join_timeout=None):
"""Execute `fn` at periodic intervals (in the background)."""
stop_condition = threading.Event()
thread = threading.Thread(
target=partial(
poll_periodically,
stop_condition,
poll_interval,
fn,
),
)
thread.daemon = (join_timeout is not None)
thread.start()
try:
yield
finally:
stop_condition.set()
if thread.join(join_timeout) is False:
logging.warning('Poller task did not join in time.')
class CpuMonitor(object):
"""Track CPU time clocked by each thread."""
def __init__(self, process):
self._process = process
self._threads = {} # {ID: (user_time, system_time)}
def sample(self):
threads = {t.ident: t.name for t in threading.enumerate()}
for thread in self._process.threads():
# Lookup thread name (if available).
name = threads.get(thread.id) or str(thread.id)
# Compute delta time since last sample.
old_total = self._threads.get(thread.id)
if old_total is None:
old_total = (0.0, 0.0)
new_total = (
thread.user_time,
thread.system_time,
)
self._threads[thread.id] = new_total
delta = (
max(0.0, new_total[0] - old_total[0]),
max(0.0, new_total[1] - old_total[1]),
)
# Forward info somewhere useful.
print('Thread "%s": %.1f user, %.1f system.' % (
name,
delta[0],
delta[1],
))
def main():
monitor = CpuMonitor(process=psutil.Process())
with poll_in_background(monitor.sample, 5.0):
time.sleep(30.0)
if __name__ == '__main__':
logging.basicConfig(level=logging.WARNING)
main()
# -*- coding: utf-8 -*-
import ctypes
import ctypes.wintypes
import json
import logging
import threading
import time
from contextlib import contextmanager
from functools import partial
class _FILETIME(ctypes.Structure):
"""Win32 FILETIME structure for call to ``GetThreadTimes()``."""
_fields_ = (
('dwLowDateTime', ctypes.wintypes.DWORD),
('dwHighDateTime', ctypes.wintypes.DWORD),
)
def as_seconds(self):
# 64-bit count of 100ns ticks expressed as pair of 32-bit integers.
ticks = self.dwHighDateTime << 32 | self.dwLowDateTime
return float(ticks) / (10 ** 7)
@contextmanager
def _current_thread(access=0x0040):
"""Get a "real" handle to the current thread."""
ident = ctypes.windll.kernel32.GetCurrentThreadId()
assert ident
handle = ctypes.windll.kernel32.OpenThread(access, False, ident)
assert handle != 0
try:
yield handle
finally:
status = ctypes.windll.kernel32.CloseHandle(handle)
assert status != 0
def _thread_times(h):
"""Get total/cumulative system time and user time for a specific thread."""
ctime = _FILETIME()
etime = _FILETIME()
ktime = _FILETIME()
utime = _FILETIME()
status = ctypes.windll.kernel32.GetThreadTimes(
h,
ctypes.byref(ctime),
ctypes.byref(etime),
ctypes.byref(ktime),
ctypes.byref(utime),
)
assert status != 0
return (
utime.as_seconds(),
ktime.as_seconds(),
)
class Context(object):
"""CPU attribution tracker for one thread."""
def __init__(self, clock):
self._clock = clock
self._watch = {} # {label: (user_time, system_time)}
self._stack = []
self._times = self._clock()
def collect(self):
watch = self._watch
self._watch = {}
return watch
def sample_elapsed_time(self):
old_times = self._times
new_times = self._clock()
self._times = new_times
return (
max(0.0, new_times[0] - old_times[0]),
max(0.0, new_times[1] - old_times[1]),
)
def enter_scope(self, label):
# Mark transition.
delta = self.sample_elapsed_time()
# Push new label onto the stack.
self._stack.append(label)
def leave_scope(self):
label = self._stack.pop()
delta = self.sample_elapsed_time()
# Accumulate time spent in this stage.
total = self._watch.get(label)
if total is None:
total = (0.0, 0.0)
self._watch[label] = (
total[0] + delta[0],
total[1] + delta[1],
)
@contextmanager
def scope_for(self, label):
self.enter_scope(label)
try:
yield
finally:
self.leave_scope()
_state = threading.local()
@contextmanager
def cpu_attribution():
"""Enable CPU attribution tracking in the current thread."""
with _current_thread() as handle:
_state.context = Context(partial(_thread_times, handle))
yield
@contextmanager
def cpu_scope(label):
"""Enter a new scope in the current thread."""
with _state.context.scope_for(label):
yield
def collect_cpu_attribution():
"""Get CPU attribution metrics for the current thread."""
attribution = _state.context.collect()
attribution = {
label: {
'user': data[0],
'system': data[1],
}
for label, data in attribution.iteritems()
}
print(json.dumps(
attribution,
indent=2,
sort_keys=True,
))
def main():
with cpu_attribution():
with cpu_scope('main'):
for i in range(5):
with cpu_scope('count'):
n = 0
for i in range(100000):
n += i
collect_cpu_attribution()
if __name__ == '__main__':
logging.basicConfig(
level=logging.WARNING,
)
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment