Created
August 2, 2018 04:03
-
-
Save Hasenpfote/935f8a3a207aed93d3afba90e94060e2 to your computer and use it in GitHub Desktop.
Take a snapshot in a function.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import inspect | |
import ast | |
import types | |
import linecache | |
from tracemalloc import start, take_snapshot, Filter | |
DUMMY_SRC_NAME = '<tracemalloc_utils-src>' | |
class Transformer(ast.NodeTransformer): | |
'''Add tracemalloc functions.''' | |
def __init__(self, result_id): | |
self._result_id = result_id | |
def visit_FunctionDef(self, node): | |
pre_hook_expr = ast.Expr( | |
value=ast.Call( | |
func=ast.Name(id='start', ctx=ast.Load()), | |
args=[], | |
keywords=[] | |
) | |
) | |
finalbody = [ | |
ast.Global(names=[self._result_id]), | |
ast.Assign( | |
targets=[ast.Name(id=self._result_id, ctx=ast.Store())], | |
value=ast.Call( | |
func=ast.Name(id='take_snapshot', ctx=ast.Load()), | |
args=[], | |
keywords=[] | |
) | |
) | |
] | |
body_elems = [pre_hook_expr] | |
body_elems.extend([elem for elem in node.body]) | |
node.body.clear() | |
node.body.append( | |
ast.Try( | |
body=body_elems, | |
handlers=[], | |
orelse=[], | |
finalbody=finalbody | |
) | |
) | |
return ast.fix_missing_locations(node) | |
def take_inner_snapshot( | |
function, | |
function_args, | |
setup='pass' | |
): | |
'''Take a inner snapshot.''' | |
if not isinstance(function, types.FunctionType): | |
raise TypeError | |
if not isinstance(function_args, dict): | |
raise TypeError | |
if not isinstance(setup, str): | |
raise TypeError | |
# Add modules temporarily. | |
temp = {'SNAPSHOT': None} | |
code = compile(setup, DUMMY_SRC_NAME, 'exec') | |
exec(code, globals(), temp) | |
globals().update(temp) | |
global SNAPSHOT | |
# Modify the function. | |
source = inspect.getsource(function) | |
node = ast.parse(source) | |
node = Transformer(result_id='SNAPSHOT').visit(node) | |
# Take the snapshot. | |
_locals = {} | |
code = compile(node, DUMMY_SRC_NAME, 'exec') | |
exec(code, globals(), _locals) | |
dst_function = _locals[function.__name__] | |
dst_function(**function_args) | |
snapshot = SNAPSHOT | |
# Restore. | |
for key in temp.keys(): | |
globals().pop(key, None) | |
return snapshot, source | |
def display_top( | |
snapshot, | |
source=None, | |
key_type='lineno', | |
limit=10 | |
): | |
if source is not None: | |
source_lines = source.split(sep='\n') | |
snapshot = snapshot.filter_traces([Filter(True, DUMMY_SRC_NAME),]) | |
top_stats = snapshot.statistics(key_type) | |
print('Top {} lines'.format(limit)) | |
for index, stat in enumerate(top_stats[:limit], 1): | |
frame = stat.traceback[0] | |
# replace "/path/to/module/file.py" with "module/file.py" | |
filename = os.sep.join(frame.filename.split(os.sep)[-2:]) | |
fmt = '#{index}: {filename}:{lineno}: {size:.1f} KiB'.format( | |
index=index, | |
filename=filename, | |
lineno=frame.lineno, | |
size=stat.size / 1024 | |
) | |
print(fmt) | |
line = '' | |
if frame.filename == DUMMY_SRC_NAME: | |
if source is not None: | |
line = source_lines[frame.lineno-1].strip() | |
else: | |
line = linecache.getline(frame.filename, frame.lineno).strip() | |
if line: | |
print(' %s' % line) | |
other = top_stats[limit:] | |
if other: | |
size = sum(stat.size for stat in other) | |
fmt = '{length} other: {size:.1f} KiB'.format( | |
length=len(other), | |
size=size / 1024 | |
) | |
print(fmt) | |
total = sum(stat.size for stat in top_stats) | |
fmt = 'Total allocated size: {size:.1f} KiB'.format(size=total / 1024) | |
print(fmt) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment