Last active
August 21, 2022 06:56
-
-
Save dutc/fecc69b8ea3f5ee9a30c8408caeba212 to your computer and use it in GitHub Desktop.
Case Studies in Python Memory Analysis using High-Watermark Testing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from collections import namedtuple | |
from functools import wraps | |
from inspect import signature | |
from itertools import islice, tee | |
from math import isclose | |
from shlex import quote | |
from subprocess import check_call, DEVNULL, CalledProcessError | |
from textwrap import dedent | |
from numpy import arange | |
nwise = lambda g, *, n=2: zip(*(islice(g, i, None) for i, g in enumerate(tee(g, n)))) | |
def binary_search(values): | |
lidx, ridx = 0, len(values) - 1 | |
while True: | |
if (ridx - lidx) <= 1: | |
break | |
if (yield values[(midx := (ridx - lidx) // 2 + lidx)]): | |
lidx = midx | |
else: | |
ridx = midx | |
class BisectedRun(namedtuple('RunBase', 'signature result exception')): | |
sig = property(lambda s: s.signature) | |
res = property(lambda s: s.result) | |
exc = property(lambda s: s.exception) | |
@lambda cls: cls() | |
class args: | |
def __get__(self, instance, owner): | |
return namedtuple('Arguments', instance.sig.arguments.keys())(**instance.sig.arguments) | |
@classmethod | |
def support(cls, arguments, exceptions): | |
def dec(f): | |
sig = signature(f) | |
@lambda bisected: setattr(f, 'bisected', bisected) | |
@wraps(arguments) | |
def bisected(*arg_args, **arg_kwargs): | |
ci = binary_search(arguments(*arg_args, **arg_kwargs)) | |
@wraps(f) | |
def inner(*f_fixed_args, **f_fixed_kwargs): | |
(f_args, f_kwargs), exc = next(ci), None | |
while True: | |
f_bound_args = sig.bind( | |
*f_fixed_args, *f_args, | |
**f_fixed_kwargs, **f_kwargs | |
) | |
try: | |
res = f(*f_bound_args.args, **f_bound_args.kwargs) | |
except (*exceptions,) as e: | |
exc = e | |
else: | |
exc = None | |
yield cls(f_bound_args, exc, res) | |
try: | |
f_args, f_kwargs = ci.send(exc is not None) | |
except StopIteration: | |
break | |
return inner | |
return f | |
return dec | |
@BisectedRun.support( | |
arguments=lambda min_vmem, max_vmem: | |
[((), {'vmem': m}) for m in arange(min_vmem, max_vmem, 1024, dtype=int)], | |
exceptions={CalledProcessError}, | |
) | |
def run_python_ulimit(code, vmem): | |
return check_call([ | |
'sh', '-c', f'''( | |
ulimit -v {vmem} | |
python -c {quote(code)} | |
)''' | |
], stderr=DEVNULL, stdout=DEVNULL) | |
if __name__ == '__main__': | |
#### Case Study: NumPy Import | |
if True: | |
print(' {} '.format('What is the minimum memory needed to `import numpy`?').center(120, '\N{box drawings light horizontal}')) | |
code = dedent(''' | |
import numpy | |
''').strip() | |
for prev_run, curr_run in nwise(run_python_ulimit.bisected(0, 2**20)(code)): | |
pass | |
bad_mem, ok_mem = sorted([prev_run.args.vmem, curr_run.args.vmem]) | |
try: | |
run_python_ulimit(code, ok_mem) | |
except Exception as e: | |
raise ValueError(f'unanticipated failure with {ok_mem = }') from e | |
try: | |
run_python_ulimit(code, bad_mem) | |
except Exception as e: pass | |
else: raise ValueError(f'unanticipated success with {bad_mem = }') | |
print(f'Need `ulimit -v ≥{ok_mem}` to `import numpy`.') | |
#### Case Study: Attribute Dictionaries | |
if True: | |
print(' {} '.format('What is the preferred implementation of `attrdict`?').center(120, '\N{box drawings light horizontal}')) | |
attrdict_protocols = dedent(''' | |
# attrdict via dispatch to `dict.__*item__` protocols | |
class attrdict(dict): | |
__getattr__ = dict.__getitem__ | |
__setattr__ = dict.__setitem__ | |
__delattr__ = dict.__delitem__ | |
''') | |
attrdict_circular = dedent(''' | |
# attrdict via circular reference on `self.__dict__` | |
class attrdict(dict): | |
def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
self.__dict__ = self | |
''') | |
simple_test = dedent(''' | |
for _ in range(2): | |
d = attrdict({idx: None for idx in range(100_000)}) | |
''') | |
sustained_test = dedent(''' | |
for _ in range(100): | |
d = attrdict({idx: None for idx in range(100_000)}) | |
''') | |
code = '\n'.join([attrdict_protocols, simple_test]) | |
for prev_run, curr_run in nwise(run_python_ulimit.bisected(0, 2**20)(code)): | |
pass | |
attrdict_protocols_ok_mem = max([prev_run.args.vmem, curr_run.args.vmem]) | |
code = '\n'.join([attrdict_circular, simple_test]) | |
for prev_run, curr_run in nwise(run_python_ulimit.bisected(0, 2**20)(code)): | |
pass | |
attrdict_circular_ok_mem = max([prev_run.args.vmem, curr_run.args.vmem]) | |
assert isclose(attrdict_protocols_ok_mem, attrdict_circular_ok_mem, abs_tol=1024), 'memory usage in simple test should be close (≤1MiB)' | |
code = '\n'.join([attrdict_protocols, sustained_test]) | |
try: | |
run_python_ulimit(code, attrdict_protocols_ok_mem) | |
except Exception as e: | |
raise ValueError(f'unanticipated failure with {attrdict_protocols_ok_mem = } in sustained run') from e | |
code = '\n'.join([attrdict_circular, sustained_test]) | |
try: | |
run_python_ulimit(code, attrdict_circular_ok_mem) | |
except Exception as e: pass | |
else: | |
raise ValueError(f'unanticipated success with {attrdict_circular_ok_mem = } in sustained run') | |
print( | |
'Implementation with circular reference creates memory pressure as a result of cyclic garbage collection delay;', | |
'fails under sustained test with allocations in tight loop.' | |
) | |
#### Case Study: NumPy Memory Leak | |
if True: | |
print(' {} '.format("Is there a memory leak with circular reference of `numpy.ndarray` with `dtype='O'`?").center(120, '\N{box drawings light horizontal}')) | |
core_code = dedent(''' | |
from numpy import array | |
def create_normal_array(size=100_000): | |
xs = array([None for _ in range(size)], dtype=object) | |
ys = array([None for _ in range(size)], dtype=object) | |
return xs, ys | |
def create_circular_array(size=100_000): | |
xs = array([None for _ in range(size)], dtype=object) | |
ys = array([None for _ in range(size)], dtype=object) | |
xs[0], ys[0] = ys, xs # circular reference | |
return xs, ys | |
''') | |
baseline_test = dedent(''' | |
for _ in range(2): | |
xs, ys = create_normal_array() | |
''') | |
sustained_test = dedent(''' | |
for _ in range(100): | |
xs, ys = create_circular_array() | |
''') | |
verification_test = dedent(''' | |
for _ in range(100): | |
xs, ys = create_normal_array() | |
''') | |
code = '\n'.join([core_code, baseline_test]) | |
for prev_run, curr_run in nwise(run_python_ulimit.bisected(0, 2**20)(code)): | |
pass | |
ok_mem = max([prev_run.args.vmem, curr_run.args.vmem]) | |
code = '\n'.join([core_code, verification_test]) | |
try: | |
run_python_ulimit(code, ok_mem) | |
except Exception as e: | |
raise ValueError(f'unanticipated failure with {attrdict_protocols_ok_mem = } in sustained run') from e | |
code = '\n'.join([core_code, sustained_test]) | |
try: | |
run_python_ulimit(code, ok_mem) | |
except Exception as e: pass | |
else: | |
raise ValueError(f'unanticipated success with {attrdict_circular_ok_mem = } in sustained run') | |
print( | |
"There is a memory leak with circular references in `numpy.ndarray` with `dtype='0'", | |
"resulting from missing `tp_traverse` implementation in PyArray_Type", | |
'(https://github.com/numpy/numpy/blob/17d730ae32f5f60c9c2ca75d202b4e866debd686/numpy/core/src/multiarray/arrayobject.c#L1327)', | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment