Created
June 19, 2018 18:00
-
-
Save AndreLouisCaron/adb8b5e60af88f90c6d9c77b7bb53872 to your computer and use it in GitHub Desktop.
CPU attribution for Python server applications on Windows
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import logging | |
import psutil | |
import threading | |
import time | |
from contextlib import contextmanager | |
from functools import partial | |
@contextmanager | |
def trap_errors(exc_class=Exception, logger=None): | |
"""Log and ignore exceptions.""" | |
logger = logger or logging.getLogger() | |
try: | |
yield | |
except exc_class: | |
logger.exception('Uncaught exception!') | |
def poll_periodically(stop_condition, poll_interval, fn): | |
"""Execute `fn` at periodic intervals (in the foreground).""" | |
while not stop_condition.wait(poll_interval): | |
with trap_errors(): | |
fn() | |
@contextmanager | |
def poll_in_background(fn, poll_interval, join_timeout=None): | |
"""Execute `fn` at periodic intervals (in the background).""" | |
stop_condition = threading.Event() | |
thread = threading.Thread( | |
target=partial( | |
poll_periodically, | |
stop_condition, | |
poll_interval, | |
fn, | |
), | |
) | |
thread.daemon = (join_timeout is not None) | |
thread.start() | |
try: | |
yield | |
finally: | |
stop_condition.set() | |
if thread.join(join_timeout) is False: | |
logging.warning('Poller task did not join in time.') | |
class CpuMonitor(object): | |
"""Track CPU time clocked by each thread.""" | |
def __init__(self, process): | |
self._process = process | |
self._threads = {} # {ID: (user_time, system_time)} | |
def sample(self): | |
threads = {t.ident: t.name for t in threading.enumerate()} | |
for thread in self._process.threads(): | |
# Lookup thread name (if available). | |
name = threads.get(thread.id) or str(thread.id) | |
# Compute delta time since last sample. | |
old_total = self._threads.get(thread.id) | |
if old_total is None: | |
old_total = (0.0, 0.0) | |
new_total = ( | |
thread.user_time, | |
thread.system_time, | |
) | |
self._threads[thread.id] = new_total | |
delta = ( | |
max(0.0, new_total[0] - old_total[0]), | |
max(0.0, new_total[1] - old_total[1]), | |
) | |
# Forward info somewhere useful. | |
print('Thread "%s": %.1f user, %.1f system.' % ( | |
name, | |
delta[0], | |
delta[1], | |
)) | |
def main(): | |
monitor = CpuMonitor(process=psutil.Process()) | |
with poll_in_background(monitor.sample, 5.0): | |
time.sleep(30.0) | |
if __name__ == '__main__': | |
logging.basicConfig(level=logging.WARNING) | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import ctypes | |
import ctypes.wintypes | |
import json | |
import logging | |
import threading | |
import time | |
from contextlib import contextmanager | |
from functools import partial | |
class _FILETIME(ctypes.Structure): | |
"""Win32 FILETIME structure for call to ``GetThreadTimes()``.""" | |
_fields_ = ( | |
('dwLowDateTime', ctypes.wintypes.DWORD), | |
('dwHighDateTime', ctypes.wintypes.DWORD), | |
) | |
def as_seconds(self): | |
# 64-bit count of 100ns ticks expressed as pair of 32-bit integers. | |
ticks = self.dwHighDateTime << 32 | self.dwLowDateTime | |
return float(ticks) / (10 ** 7) | |
@contextmanager | |
def _current_thread(access=0x0040): | |
"""Get a "real" handle to the current thread.""" | |
ident = ctypes.windll.kernel32.GetCurrentThreadId() | |
assert ident | |
handle = ctypes.windll.kernel32.OpenThread(access, False, ident) | |
assert handle != 0 | |
try: | |
yield handle | |
finally: | |
status = ctypes.windll.kernel32.CloseHandle(handle) | |
assert status != 0 | |
def _thread_times(h): | |
"""Get total/cumulative system time and user time for a specific thread.""" | |
ctime = _FILETIME() | |
etime = _FILETIME() | |
ktime = _FILETIME() | |
utime = _FILETIME() | |
status = ctypes.windll.kernel32.GetThreadTimes( | |
h, | |
ctypes.byref(ctime), | |
ctypes.byref(etime), | |
ctypes.byref(ktime), | |
ctypes.byref(utime), | |
) | |
assert status != 0 | |
return ( | |
utime.as_seconds(), | |
ktime.as_seconds(), | |
) | |
class Context(object): | |
"""CPU attribution tracker for one thread.""" | |
def __init__(self, clock): | |
self._clock = clock | |
self._watch = {} # {label: (user_time, system_time)} | |
self._stack = [] | |
self._times = self._clock() | |
def collect(self): | |
watch = self._watch | |
self._watch = {} | |
return watch | |
def sample_elapsed_time(self): | |
old_times = self._times | |
new_times = self._clock() | |
self._times = new_times | |
return ( | |
max(0.0, new_times[0] - old_times[0]), | |
max(0.0, new_times[1] - old_times[1]), | |
) | |
def enter_scope(self, label): | |
# Mark transition. | |
delta = self.sample_elapsed_time() | |
# Push new label onto the stack. | |
self._stack.append(label) | |
def leave_scope(self): | |
label = self._stack.pop() | |
delta = self.sample_elapsed_time() | |
# Accumulate time spent in this stage. | |
total = self._watch.get(label) | |
if total is None: | |
total = (0.0, 0.0) | |
self._watch[label] = ( | |
total[0] + delta[0], | |
total[1] + delta[1], | |
) | |
@contextmanager | |
def scope_for(self, label): | |
self.enter_scope(label) | |
try: | |
yield | |
finally: | |
self.leave_scope() | |
_state = threading.local() | |
@contextmanager | |
def cpu_attribution(): | |
"""Enable CPU attribution tracking in the current thread.""" | |
with _current_thread() as handle: | |
_state.context = Context(partial(_thread_times, handle)) | |
yield | |
@contextmanager | |
def cpu_scope(label): | |
"""Enter a new scope in the current thread.""" | |
with _state.context.scope_for(label): | |
yield | |
def collect_cpu_attribution(): | |
"""Get CPU attribution metrics for the current thread.""" | |
attribution = _state.context.collect() | |
attribution = { | |
label: { | |
'user': data[0], | |
'system': data[1], | |
} | |
for label, data in attribution.iteritems() | |
} | |
print(json.dumps( | |
attribution, | |
indent=2, | |
sort_keys=True, | |
)) | |
def main(): | |
with cpu_attribution(): | |
with cpu_scope('main'): | |
for i in range(5): | |
with cpu_scope('count'): | |
n = 0 | |
for i in range(100000): | |
n += i | |
collect_cpu_attribution() | |
if __name__ == '__main__': | |
logging.basicConfig( | |
level=logging.WARNING, | |
) | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment