Skip to content

Instantly share code, notes, and snippets.

@Andrei-Pozolotin
Last active September 3, 2019 22:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Andrei-Pozolotin/34a4a4f4e1cadf7893f9eecf695def2a to your computer and use it in GitHub Desktop.
Save Andrei-Pozolotin/34a4a4f4e1cadf7893f9eecf695def2a to your computer and use it in GitHub Desktop.
a-pyinstrument-70
https://github.com/joerick/pyinstrument/issues/70
0.000 [0.000] <module> @ pysrc/runfiles.py:5
0.000 [0.000] main @ pysrc/runfiles.py:15
0.000 [0.000] main @ config/__init__.py:43
3.921 [0.000] _bootstrap @ python3.7/threading.py:876
3.921 [0.000] _bootstrap_inner @ python3.7/threading.py:911
3.921 [0.000] run @ python3.7/threading.py:859
3.921 [0.000] dummy_thread @ mail_serv_test/profiler_test.py:21
2.619 [0.020] dummy_fun2 @ mail_serv_test/profiler_test.py:12
1.302 [0.010] dummy_fun1 @ mail_serv_test/profiler_test.py:8
import os
import sys
import time
import threading
from dataclasses import dataclass, field
from typing import Mapping, List, Set, Iterable
from collections import defaultdict
from time import process_time
from contextlib import contextmanager
from pyinstrument import Profiler
from pyinstrument_cext import setstatprofile
from pyinstrument.session import ProfilerSession
from pyinstrument.renderers.console import ConsoleRenderer
@dataclass
class FrameStat:
"call statistics collector"
count:int = 0 # running invocation count
time_this:float = 0 # time spent in this call
time_total:float = -1 # time spent in this and below
frame:object = None # frame reported by interrupt
leaf_guid_set:Set[str] = field(default_factory=set) # immediate leaf list
@property
def guid(self) -> str:
return frame_guid(self.frame)
@property
def file_path(self) -> str:
return self.frame.f_code.co_filename
@property
def file_name(self) -> str:
return os.path.basename(self.file_path)
@property
def unit_name(self) -> str: # "package/module.py"
base_name = os.path.basename(os.path.dirname(self.file_path))
return f"{base_name}/{self.file_name}"
@property
def line_num(self) -> int:
return self.frame.f_code.co_firstlineno
@property
def code_name(self) -> str: # module/function
return self.frame.f_code.co_name
@property
def unit_time(self) -> str: # time per each call
if self.count:
time = self.time_this / self.count
else:
time = 0
return "{:.3f}".format(time)
@property
def total_time(self) -> str:
return "{:.3f}".format(self.time_total)
def render_line(self, line_tabs:str) -> str:
return (
f"{line_tabs}{self.total_time} [{self.unit_time}] "
f"{self.code_name} @ "
f"{self.unit_name}:{self.line_num} "
)
class SystemProfiler():
"continous multi-threaded sampling profiler"
interval:float
time_init:float
time_past:float
profiler_lock:threading.Lock
frame_stat_map:Mapping[str, FrameStat]
def __init__(self, interval:float=0.001):
"activate profiler for main thread"
self.interval = interval
self.frame_stat_map = dict()
self.time_init = time.perf_counter()
self.time_past = time.perf_counter()
self.profiler_lock = threading.Lock()
self.activate_interrupt()
def activate_interrupt(self) -> None:
"activate profiler for every extra thread"
setstatprofile(self.profiler_interrupt, self.interval)
def profiler_interrupt(self, frame:object, event:str, *_) -> None:
"react to periodic interrupt from python core"
with self.profiler_lock:
time_next = time.perf_counter()
time_diff = time_next - self.time_past
self.time_past = time_next
if event == 'call':
frame = frame.f_back
stat_guid = frame_guid(frame)
stat_node = self.frame_stat_map.get(stat_guid, None) # thread safe
if not stat_node:
self.frame_stat_map[stat_guid] = FrameStat()
stat_node = self.frame_stat_map[stat_guid] # last wins
stat_node.count += 1 # thread safe
stat_node.time_this += time_diff # thread safe
stat_node.frame = frame # last wins
def frame_guid(frame:object) -> str:
"produce frame identity"
f_code = frame.f_code
return f"{f_code.co_filename}:{f_code.co_firstlineno}:{f_code.co_name}"
def frame_base_guid(frame:object) -> str:
"extract frame parent guid"
frame = frame.f_back
if frame:
return frame_guid(frame)
else:
return None
def update_call_tree(frame_stat_map:Mapping[str, FrameStat]) -> None:
"produce eventually consistent call tree with total time"
# ensure missing nodes
for node_stat in list(frame_stat_map.values()):
frame = node_stat.frame
while frame is not None:
unit_guid = frame_guid(frame)
if not unit_guid in frame_stat_map:
frame_stat_map[unit_guid] = FrameStat(frame=frame)
frame = frame.f_back
# update hierary links
for node_guid, node_stat in list(frame_stat_map.items()):
node_stat.time_total = -1 # reset time cache
base_guid = frame_base_guid(node_stat.frame)
if base_guid:
base_stat = frame_stat_map.get(base_guid, None)
if base_stat:
base_stat.leaf_guid_set.add(node_guid)
# update time cache
def update_total_time(node_stat:FrameStat) -> None:
if node_stat.time_total < 0: # needs update
node_stat.time_total = node_stat.time_this
for leaf_guid in node_stat.leaf_guid_set:
leaf_stat = frame_stat_map.get(leaf_guid, None)
if leaf_stat:
update_total_time(leaf_stat)
node_stat.time_total += leaf_stat.time_total
# update time total
for node_stat in list(frame_stat_map.values()):
update_total_time(node_stat)
def frame_root_set(frame_stat_map:Mapping[str, FrameStat]) -> Set[str]:
"discover thread entry points"
root_guid_set = set()
for node_guid, node_stat in list(frame_stat_map.items()):
base_guid = frame_base_guid(node_stat.frame)
if not base_guid:
root_guid_set.add(node_guid)
return root_guid_set
def render_stat_tree(frame_stat_map:Mapping[str, FrameStat]) -> str:
"produce compete call tree representation"
root_guid_list = list(frame_root_set(frame_stat_map))
root_guid_list.sort()
root_list = []
for root_guid in root_guid_list:
root_text = render_stat_root(root_guid, frame_stat_map)
root_list.append(root_text)
return "\n".join(root_list)
def render_stat_root(
root_guid:str,
frame_stat_map:Mapping[str, FrameStat],
) -> str:
"produce call tree for a single thread root"
line_tabs = ""
line_list = []
render_stat_node(root_guid, line_tabs, line_list, frame_stat_map)
return "\n".join(line_list)
def render_stat_node(
base_guid:str,
line_tabs:str ,
line_list:List[str],
frame_stat_map:Mapping[str, FrameStat],
) -> None:
"produce sub tree representation"
base_stat = frame_stat_map[base_guid]
line_text = base_stat.render_line(line_tabs)
line_list.append(line_text)
line_tabs += " "
for node_guid in base_stat.leaf_guid_set:
render_stat_node(node_guid, line_tabs, line_list, frame_stat_map)
from mail_serv_test import *
from mail_serv.profiler import *
os.environ['PROFILER_REPORT_DIR'] = f"{THIS_DIR}/tmp/profiler"
def dummy_fun1(index:int) -> None:
time.sleep(0.010)
def dummy_fun2(index:int) -> None:
time.sleep(0.020)
def test_system_profiler():
print()
profiler = SystemProfiler() # active in main thread
def dummy_thread():
profiler.activate_interrupt() # active in extra thread
for index in range(10):
dummy_fun1(index)
dummy_fun2(index)
while True:
thread = threading.Thread(target=dummy_thread)
thread.setDaemon(True)
thread.start()
update_call_tree(profiler.frame_stat_map)
output = render_stat_tree(profiler.frame_stat_map)
print(f"===")
print(output)
time.sleep(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment