Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
精简pycallgraph代码,生成调用关系图 python3.5.1 代码解读https://ficapy.github.io/2016/07/22/pycallgraph_note/
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: Ficapy
# Create: '20/7/2016'
import inspect
import os
import re
import tempfile
import textwrap
import pkgutil
from collections import defaultdict
from distutils.sysconfig import get_python_lib
from functools import lru_cache
class Output():
def __init__(self, filename='call.png'):
self.filename = filename
@property
def edges(self):
output = []
for src_func, dests in self.data.items():
for dst_func, calls in dests.items():
singal = '"{}" -> "{}" ["label"={}];'.format(src_func, dst_func, calls)
output.append(singal)
return output
@property
def groups(self):
output = []
for group, funcs in self.group.items():
# 排除一个funcs的
if len(funcs) == 1:
continue
func = '" "'.join(funcs)
output.append(
'subgraph "cluster_{group}" {{ '
'"{func}"; '
'label = "{group}"; '
'fontcolor = "black"; '
'style = "bold"; }}'.format(group=group, func=func))
return output
def generate(self):
'''Returns a string with the contents of a DOT file for Graphviz to
parse.
'''
indent_join = '\n' + ' ' * 12
return textwrap.dedent('''\
digraph G {{
// Attributes
node [ style = "filled", fontname = "Verdana", shape = "rect", fontsize = "7", fontcolor = "#000000ff" ];
edge [ fontname = "Verdana", fontsize = "7", fontcolor = "#000000ff" ];
// Groups
{}
// Edges
{}
}}
'''.format(
indent_join.join(self.groups),
indent_join.join(self.edges),
))
@staticmethod
def analyze(call_dict, grp):
func_map_line = defaultdict(int)
model_map_line = defaultdict(int)
for src_func, dests in call_dict.items():
func_map_line[src_func] += len(dests)
for dst_func, calls in dests.items():
func_map_line[dst_func] += 1
print('{:=^80}{:^10}'.format('func_name', 'nums'))
for func, nums in sorted(func_map_line.items(), key=lambda x: x[-1], reverse=True)[:5]:
print('{:<80}{:^10}'.format(func, nums))
for g, funcs in grp.items():
for func in funcs:
model_map_line[g] += func_map_line.get(func, 0)
print('\n{:=^80}{:^10}'.format('model', 'nums'))
for model, nums in sorted(model_map_line.items(), key=lambda x: x[-1], reverse=True)[:5]:
print('{:<80}{:^10}'.format(model, nums))
def __call__(self, data, group):
self.data = data
self.group = group
source = self.generate()
self.analyze(self.data, self.group)
fd, temp_name = tempfile.mkstemp()
with os.fdopen(fd, 'w') as f:
f.write(source)
cmd = '{} -T{} -o{} {}'.format('/usr/local/bin/dot', 'png', self.filename, temp_name)
try:
ret = os.system(cmd)
if ret:
raise Exception('The command "%(cmd)s" failed with error code %(ret)i.' % locals())
finally:
os.unlink(temp_name)
class PyCallGraph():
def __init__(self, output=None, filter=None):
self.frame_stack = ['__main__']
self.call_dict = defaultdict(lambda: defaultdict(int))
self.grp = defaultdict(set)
self.max_call_frame = 9999
self.output = output or Output()
self.filter = filter or Filter()
def dispatch_trace(self, frame, event, args):
if event == 'call':
self._trace_call(frame, event, args)
elif event == 'return':
self._trace_return(frame, event, args)
return self.dispatch_trace
def _get_readable_funcname(self, frame):
func_name = type('Func_name', (object,), {'func_name': None, 'module_name': None})
if isinstance(frame, str):
func_name.func_name = frame
func_name.module_name = frame
return func_name
code = frame.f_code
full_name_list = []
module = inspect.getmodule(code)
if module:
module_name = module.__name__
if module_name == '__main__':
module_name = ''
else:
module_name = ''
if module_name:
full_name_list.append(module_name)
try:
class_name = frame.f_locals['self'].__class__.__name__
except (KeyError, AttributeError):
pass
else:
full_name_list.append(class_name)
func = code.co_name
if func == '?':
func = '__main__'
full_name_list.append(func)
func_name.func_name = '.'.join(full_name_list)
func_name.module_name = module_name
return func_name
def _trace_call(self, frame, event, args):
if len(self.frame_stack) >= self.max_call_frame:
return
func_name = self._get_readable_funcname(frame)
if self.filter(func_name):
self.max_call_frame = len(self.frame_stack) + 1
src_func = self._get_readable_funcname(self.frame_stack[-1])
self.filter(src_func)
self.call_dict[src_func.func_name][func_name.func_name] += 1
self.grp[func_name.module_name].add(func_name.func_name)
self.frame_stack.append(frame)
def _trace_return(self, frame, event, args):
if frame is self.frame_stack[-1]:
self.frame_stack.pop(-1)
self.max_call_frame = 9999
def __enter__(self):
sys.settrace(self.dispatch_trace)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
sys.settrace(None)
self.filter.exec_exclude(self)
self.output(data=self.call_dict, group=self.grp)
inspect.getmodule = lru_cache()(inspect.getmodule)
class Filter():
site_package = []
bulit_in = []
for filefinder, name, _ in pkgutil.iter_modules():
if get_python_lib() == filefinder.path:
site_package.append(name)
elif '/lib/' in filefinder.path:
bulit_in.append(name)
def __init__(self, exclude=None):
self.exclude = exclude or []
def __call__(self, func_name):
model = func_name.module_name
name = func_name.func_name
if model in self.exclude:
func_name.func_name = model
return True
if model in self.bulit_in:
func_name.func_name = model
return True
if '.' not in name:
return True
if '._' in name or name.startswith('_'):
return True
if re.search(r'pycallgraph', name, re.I):
return True
return False
def exec_exclude(self, pycallgraph):
for model in self.exclude:
self._del_element(model, pycallgraph)
def _del_element(self, element, pycallgraph):
pycallgraph.call_dict.pop(element, None)
for src_func, dst_func in pycallgraph.call_dict.items():
dst_func.pop(element, None)
pycallgraph.grp.pop(element, None)
for _, x in pycallgraph.grp.items():
x.discard(element)
import sys
import requests
filter = Filter()
filter.exclude = ['requests.structures', 'requests.models', 'requests.utils', '', 'collections.abc', 'abc',
'email.message']
with PyCallGraph(output=Output(filename='requests.png'), filter=filter) as py:
requests.get('http://www.z.cn')
===================================func_name==================================== nums
requests.sessions.Session.resolve_redirects 12
requests.adapters.HTTPAdapter.send 9
requests.sessions.Session.send 7
requests.packages.urllib3.connectionpool.HTTPConnectionPool.urlopen 7
requests.packages.urllib3.connectionpool.HTTPSConnectionPool.urlopen 7
=====================================model====================================== nums
requests.sessions 46
urllib.parse 32
http.cookiejar 29
requests.cookies 22
requests.packages.urllib3.connectionpool 20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment