精简pycallgraph代码,生成调用关系图 python3.5.1 代码解读https://ficapy.github.io/2016/07/22/pycallgraph_note/
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
# Author: Ficapy | |
# Create: '20/7/2016' | |
import inspect | |
import os | |
import re | |
import tempfile | |
import textwrap | |
import pkgutil | |
from collections import defaultdict | |
from distutils.sysconfig import get_python_lib | |
from functools import lru_cache | |
class Output(): | |
def __init__(self, filename='call.png'): | |
self.filename = filename | |
@property | |
def edges(self): | |
output = [] | |
for src_func, dests in self.data.items(): | |
for dst_func, calls in dests.items(): | |
singal = '"{}" -> "{}" ["label"={}];'.format(src_func, dst_func, calls) | |
output.append(singal) | |
return output | |
@property | |
def groups(self): | |
output = [] | |
for group, funcs in self.group.items(): | |
# 排除一个funcs的 | |
if len(funcs) == 1: | |
continue | |
func = '" "'.join(funcs) | |
output.append( | |
'subgraph "cluster_{group}" {{ ' | |
'"{func}"; ' | |
'label = "{group}"; ' | |
'fontcolor = "black"; ' | |
'style = "bold"; }}'.format(group=group, func=func)) | |
return output | |
def generate(self): | |
'''Returns a string with the contents of a DOT file for Graphviz to | |
parse. | |
''' | |
indent_join = '\n' + ' ' * 12 | |
return textwrap.dedent('''\ | |
digraph G {{ | |
// Attributes | |
node [ style = "filled", fontname = "Verdana", shape = "rect", fontsize = "7", fontcolor = "#000000ff" ]; | |
edge [ fontname = "Verdana", fontsize = "7", fontcolor = "#000000ff" ]; | |
// Groups | |
{} | |
// Edges | |
{} | |
}} | |
'''.format( | |
indent_join.join(self.groups), | |
indent_join.join(self.edges), | |
)) | |
@staticmethod | |
def analyze(call_dict, grp): | |
func_map_line = defaultdict(int) | |
model_map_line = defaultdict(int) | |
for src_func, dests in call_dict.items(): | |
func_map_line[src_func] += len(dests) | |
for dst_func, calls in dests.items(): | |
func_map_line[dst_func] += 1 | |
print('{:=^80}{:^10}'.format('func_name', 'nums')) | |
for func, nums in sorted(func_map_line.items(), key=lambda x: x[-1], reverse=True)[:5]: | |
print('{:<80}{:^10}'.format(func, nums)) | |
for g, funcs in grp.items(): | |
for func in funcs: | |
model_map_line[g] += func_map_line.get(func, 0) | |
print('\n{:=^80}{:^10}'.format('model', 'nums')) | |
for model, nums in sorted(model_map_line.items(), key=lambda x: x[-1], reverse=True)[:5]: | |
print('{:<80}{:^10}'.format(model, nums)) | |
def __call__(self, data, group): | |
self.data = data | |
self.group = group | |
source = self.generate() | |
self.analyze(self.data, self.group) | |
fd, temp_name = tempfile.mkstemp() | |
with os.fdopen(fd, 'w') as f: | |
f.write(source) | |
cmd = '{} -T{} -o{} {}'.format('/usr/local/bin/dot', 'png', self.filename, temp_name) | |
try: | |
ret = os.system(cmd) | |
if ret: | |
raise Exception('The command "%(cmd)s" failed with error code %(ret)i.' % locals()) | |
finally: | |
os.unlink(temp_name) | |
class PyCallGraph(): | |
def __init__(self, output=None, filter=None): | |
self.frame_stack = ['__main__'] | |
self.call_dict = defaultdict(lambda: defaultdict(int)) | |
self.grp = defaultdict(set) | |
self.max_call_frame = 9999 | |
self.output = output or Output() | |
self.filter = filter or Filter() | |
def dispatch_trace(self, frame, event, args): | |
if event == 'call': | |
self._trace_call(frame, event, args) | |
elif event == 'return': | |
self._trace_return(frame, event, args) | |
return self.dispatch_trace | |
def _get_readable_funcname(self, frame): | |
func_name = type('Func_name', (object,), {'func_name': None, 'module_name': None}) | |
if isinstance(frame, str): | |
func_name.func_name = frame | |
func_name.module_name = frame | |
return func_name | |
code = frame.f_code | |
full_name_list = [] | |
module = inspect.getmodule(code) | |
if module: | |
module_name = module.__name__ | |
if module_name == '__main__': | |
module_name = '' | |
else: | |
module_name = '' | |
if module_name: | |
full_name_list.append(module_name) | |
try: | |
class_name = frame.f_locals['self'].__class__.__name__ | |
except (KeyError, AttributeError): | |
pass | |
else: | |
full_name_list.append(class_name) | |
func = code.co_name | |
if func == '?': | |
func = '__main__' | |
full_name_list.append(func) | |
func_name.func_name = '.'.join(full_name_list) | |
func_name.module_name = module_name | |
return func_name | |
def _trace_call(self, frame, event, args): | |
if len(self.frame_stack) >= self.max_call_frame: | |
return | |
func_name = self._get_readable_funcname(frame) | |
if self.filter(func_name): | |
self.max_call_frame = len(self.frame_stack) + 1 | |
src_func = self._get_readable_funcname(self.frame_stack[-1]) | |
self.filter(src_func) | |
self.call_dict[src_func.func_name][func_name.func_name] += 1 | |
self.grp[func_name.module_name].add(func_name.func_name) | |
self.frame_stack.append(frame) | |
def _trace_return(self, frame, event, args): | |
if frame is self.frame_stack[-1]: | |
self.frame_stack.pop(-1) | |
self.max_call_frame = 9999 | |
def __enter__(self): | |
sys.settrace(self.dispatch_trace) | |
return self | |
def __exit__(self, exc_type, exc_val, exc_tb): | |
sys.settrace(None) | |
self.filter.exec_exclude(self) | |
self.output(data=self.call_dict, group=self.grp) | |
inspect.getmodule = lru_cache()(inspect.getmodule) | |
class Filter(): | |
site_package = [] | |
bulit_in = [] | |
for filefinder, name, _ in pkgutil.iter_modules(): | |
if get_python_lib() == filefinder.path: | |
site_package.append(name) | |
elif '/lib/' in filefinder.path: | |
bulit_in.append(name) | |
def __init__(self, exclude=None): | |
self.exclude = exclude or [] | |
def __call__(self, func_name): | |
model = func_name.module_name | |
name = func_name.func_name | |
if model in self.exclude: | |
func_name.func_name = model | |
return True | |
if model in self.bulit_in: | |
func_name.func_name = model | |
return True | |
if '.' not in name: | |
return True | |
if '._' in name or name.startswith('_'): | |
return True | |
if re.search(r'pycallgraph', name, re.I): | |
return True | |
return False | |
def exec_exclude(self, pycallgraph): | |
for model in self.exclude: | |
self._del_element(model, pycallgraph) | |
def _del_element(self, element, pycallgraph): | |
pycallgraph.call_dict.pop(element, None) | |
for src_func, dst_func in pycallgraph.call_dict.items(): | |
dst_func.pop(element, None) | |
pycallgraph.grp.pop(element, None) | |
for _, x in pycallgraph.grp.items(): | |
x.discard(element) | |
import sys | |
import requests | |
filter = Filter() | |
filter.exclude = ['requests.structures', 'requests.models', 'requests.utils', '', 'collections.abc', 'abc', | |
'email.message'] | |
with PyCallGraph(output=Output(filename='requests.png'), filter=filter) as py: | |
requests.get('http://www.z.cn') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
===================================func_name==================================== nums | |
requests.sessions.Session.resolve_redirects 12 | |
requests.adapters.HTTPAdapter.send 9 | |
requests.sessions.Session.send 7 | |
requests.packages.urllib3.connectionpool.HTTPConnectionPool.urlopen 7 | |
requests.packages.urllib3.connectionpool.HTTPSConnectionPool.urlopen 7 | |
=====================================model====================================== nums | |
requests.sessions 46 | |
urllib.parse 32 | |
http.cookiejar 29 | |
requests.cookies 22 | |
requests.packages.urllib3.connectionpool 20 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment