精简pycallgraph代码,生成调用关系图 python3.5.1 代码解读https://ficapy.github.io/2016/07/22/pycallgraph_note/
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
# Author: Ficapy | |
# Create: '20/7/2016' | |
import inspect | |
import os | |
import re | |
import tempfile | |
import textwrap | |
import pkgutil | |
from collections import defaultdict | |
from distutils.sysconfig import get_python_lib | |
from functools import lru_cache | |
class Output(): | |
def __init__(self, filename='call.png'): | |
self.filename = filename | |
@property | |
def edges(self): | |
output = [] | |
for src_func, dests in self.data.items(): | |
for dst_func, calls in dests.items(): | |
singal = '"{}" -> "{}" ["label"={}];'.format(src_func, dst_func, calls) | |
output.append(singal) | |
return output | |
@property | |
def groups(self): | |
output = [] | |
for group, funcs in self.group.items(): | |
# 排除一个funcs的 | |
if len(funcs) == 1: | |
continue | |
func = '" "'.join(funcs) | |
output.append( | |
'subgraph "cluster_{group}" {{ ' | |
'"{func}"; ' | |
'label = "{group}"; ' | |
'fontcolor = "black"; ' | |
'style = "bold"; }}'.format(group=group, func=func)) | |
return output | |
def generate(self): | |
'''Returns a string with the contents of a DOT file for Graphviz to | |
parse. | |
''' | |
indent_join = '\n' + ' ' * 12 | |
return textwrap.dedent('''\ | |
digraph G {{ | |
// Attributes | |
node [ style = "filled", fontname = "Verdana", shape = "rect", fontsize = "7", fontcolor = "#000000ff" ]; | |
edge [ fontname = "Verdana", fontsize = "7", fontcolor = "#000000ff" ]; | |
// Groups | |
{} | |
// Edges | |
{} | |
}} | |
'''.format( | |
indent_join.join(self.groups), | |
indent_join.join(self.edges), | |
)) | |
@staticmethod | |
def analyze(call_dict, grp): | |
func_map_line = defaultdict(int) | |
model_map_line = defaultdict(int) | |
for src_func, dests in call_dict.items(): | |
func_map_line[src_func] += len(dests) | |
for dst_func, calls in dests.items(): | |
func_map_line[dst_func] += 1 | |
print('{:=^80}{:^10}'.format('func_name', 'nums')) | |
for func, nums in sorted(func_map_line.items(), key=lambda x: x[-1], reverse=True)[:5]: | |
print('{:<80}{:^10}'.format(func, nums)) | |
for g, funcs in grp.items(): | |
for func in funcs: | |
model_map_line[g] += func_map_line.get(func, 0) | |
print('\n{:=^80}{:^10}'.format('model', 'nums')) | |
for model, nums in sorted(model_map_line.items(), key=lambda x: x[-1], reverse=True)[:5]: | |
print('{:<80}{:^10}'.format(model, nums)) | |
def __call__(self, data, group): | |
self.data = data | |
self.group = group | |
source = self.generate() | |
self.analyze(self.data, self.group) | |
fd, temp_name = tempfile.mkstemp() | |
with os.fdopen(fd, 'w') as f: | |
f.write(source) | |
cmd = '{} -T{} -o{} {}'.format('/usr/local/bin/dot', 'png', self.filename, temp_name) | |
try: | |
ret = os.system(cmd) | |
if ret: | |
raise Exception('The command "%(cmd)s" failed with error code %(ret)i.' % locals()) | |
finally: | |
os.unlink(temp_name) | |
class PyCallGraph(): | |
def __init__(self, output=None, filter=None): | |
self.frame_stack = ['__main__'] | |
self.call_dict = defaultdict(lambda: defaultdict(int)) | |
self.grp = defaultdict(set) | |
self.max_call_frame = 9999 | |
self.output = output or Output() | |
self.filter = filter or Filter() | |
def dispatch_trace(self, frame, event, args): | |
if event == 'call': | |
self._trace_call(frame, event, args) | |
elif event == 'return': | |
self._trace_return(frame, event, args) | |
return self.dispatch_trace | |
def _get_readable_funcname(self, frame): | |
func_name = type('Func_name', (object,), {'func_name': None, 'module_name': None}) | |
if isinstance(frame, str): | |
func_name.func_name = frame | |
func_name.module_name = frame | |
return func_name | |
code = frame.f_code | |
full_name_list = [] | |
module = inspect.getmodule(code) | |
if module: | |
module_name = module.__name__ | |
if module_name == '__main__': | |
module_name = '' | |
else: | |
module_name = '' | |
if module_name: | |
full_name_list.append(module_name) | |
try: | |
class_name = frame.f_locals['self'].__class__.__name__ | |
except (KeyError, AttributeError): | |
pass | |
else: | |
full_name_list.append(class_name) | |
func = code.co_name | |
if func == '?': | |
func = '__main__' | |
full_name_list.append(func) | |
func_name.func_name = '.'.join(full_name_list) | |
func_name.module_name = module_name | |
return func_name | |
def _trace_call(self, frame, event, args): | |
if len(self.frame_stack) >= self.max_call_frame: | |
return | |
func_name = self._get_readable_funcname(frame) | |
if self.filter(func_name): | |
self.max_call_frame = len(self.frame_stack) + 1 | |
src_func = self._get_readable_funcname(self.frame_stack[-1]) | |
self.filter(src_func) | |
self.call_dict[src_func.func_name][func_name.func_name] += 1 | |
self.grp[func_name.module_name].add(func_name.func_name) | |
self.frame_stack.append(frame) | |
def _trace_return(self, frame, event, args): | |
if frame is self.frame_stack[-1]: | |
self.frame_stack.pop(-1) | |
self.max_call_frame = 9999 | |
def __enter__(self): | |
sys.settrace(self.dispatch_trace) | |
return self | |
def __exit__(self, exc_type, exc_val, exc_tb): | |
sys.settrace(None) | |
self.filter.exec_exclude(self) | |
self.output(data=self.call_dict, group=self.grp) | |
inspect.getmodule = lru_cache()(inspect.getmodule) | |
class Filter(): | |
site_package = [] | |
bulit_in = [] | |
for filefinder, name, _ in pkgutil.iter_modules(): | |
if get_python_lib() == filefinder.path: | |
site_package.append(name) | |
elif '/lib/' in filefinder.path: | |
bulit_in.append(name) | |
def __init__(self, exclude=None): | |
self.exclude = exclude or [] | |
def __call__(self, func_name): | |
model = func_name.module_name | |
name = func_name.func_name | |
if model in self.exclude: | |
func_name.func_name = model | |
return True | |
if model in self.bulit_in: | |
func_name.func_name = model | |
return True | |
if '.' not in name: | |
return True | |
if '._' in name or name.startswith('_'): | |
return True | |
if re.search(r'pycallgraph', name, re.I): | |
return True | |
return False | |
def exec_exclude(self, pycallgraph): | |
for model in self.exclude: | |
self._del_element(model, pycallgraph) | |
def _del_element(self, element, pycallgraph): | |
pycallgraph.call_dict.pop(element, None) | |
for src_func, dst_func in pycallgraph.call_dict.items(): | |
dst_func.pop(element, None) | |
pycallgraph.grp.pop(element, None) | |
for _, x in pycallgraph.grp.items(): | |
x.discard(element) | |
import sys | |
import requests | |
filter = Filter() | |
filter.exclude = ['requests.structures', 'requests.models', 'requests.utils', '', 'collections.abc', 'abc', | |
'email.message'] | |
with PyCallGraph(output=Output(filename='requests.png'), filter=filter) as py: | |
requests.get('http://www.z.cn') |
===================================func_name==================================== nums | |
requests.sessions.Session.resolve_redirects 12 | |
requests.adapters.HTTPAdapter.send 9 | |
requests.sessions.Session.send 7 | |
requests.packages.urllib3.connectionpool.HTTPConnectionPool.urlopen 7 | |
requests.packages.urllib3.connectionpool.HTTPSConnectionPool.urlopen 7 | |
=====================================model====================================== nums | |
requests.sessions 46 | |
urllib.parse 32 | |
http.cookiejar 29 | |
requests.cookies 22 | |
requests.packages.urllib3.connectionpool 20 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment