Skip to content

Instantly share code, notes, and snippets.

@dutc
Last active August 21, 2022 06:56
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dutc/fecc69b8ea3f5ee9a30c8408caeba212 to your computer and use it in GitHub Desktop.
Save dutc/fecc69b8ea3f5ee9a30c8408caeba212 to your computer and use it in GitHub Desktop.
Case Studies in Python Memory Analysis using High-Watermark Testing
#!/usr/bin/env python3
from collections import namedtuple
from functools import wraps
from inspect import signature
from itertools import islice, tee
from math import isclose
from shlex import quote
from subprocess import check_call, DEVNULL, CalledProcessError
from textwrap import dedent
from numpy import arange
nwise = lambda g, *, n=2: zip(*(islice(g, i, None) for i, g in enumerate(tee(g, n))))
def binary_search(values):
lidx, ridx = 0, len(values) - 1
while True:
if (ridx - lidx) <= 1:
break
if (yield values[(midx := (ridx - lidx) // 2 + lidx)]):
lidx = midx
else:
ridx = midx
class BisectedRun(namedtuple('RunBase', 'signature result exception')):
sig = property(lambda s: s.signature)
res = property(lambda s: s.result)
exc = property(lambda s: s.exception)
@lambda cls: cls()
class args:
def __get__(self, instance, owner):
return namedtuple('Arguments', instance.sig.arguments.keys())(**instance.sig.arguments)
@classmethod
def support(cls, arguments, exceptions):
def dec(f):
sig = signature(f)
@lambda bisected: setattr(f, 'bisected', bisected)
@wraps(arguments)
def bisected(*arg_args, **arg_kwargs):
ci = binary_search(arguments(*arg_args, **arg_kwargs))
@wraps(f)
def inner(*f_fixed_args, **f_fixed_kwargs):
(f_args, f_kwargs), exc = next(ci), None
while True:
f_bound_args = sig.bind(
*f_fixed_args, *f_args,
**f_fixed_kwargs, **f_kwargs
)
try:
res = f(*f_bound_args.args, **f_bound_args.kwargs)
except (*exceptions,) as e:
exc = e
else:
exc = None
yield cls(f_bound_args, exc, res)
try:
f_args, f_kwargs = ci.send(exc is not None)
except StopIteration:
break
return inner
return f
return dec
@BisectedRun.support(
arguments=lambda min_vmem, max_vmem:
[((), {'vmem': m}) for m in arange(min_vmem, max_vmem, 1024, dtype=int)],
exceptions={CalledProcessError},
)
def run_python_ulimit(code, vmem):
return check_call([
'sh', '-c', f'''(
ulimit -v {vmem}
python -c {quote(code)}
)'''
], stderr=DEVNULL, stdout=DEVNULL)
if __name__ == '__main__':
#### Case Study: NumPy Import
if True:
print(' {} '.format('What is the minimum memory needed to `import numpy`?').center(120, '\N{box drawings light horizontal}'))
code = dedent('''
import numpy
''').strip()
for prev_run, curr_run in nwise(run_python_ulimit.bisected(0, 2**20)(code)):
pass
bad_mem, ok_mem = sorted([prev_run.args.vmem, curr_run.args.vmem])
try:
run_python_ulimit(code, ok_mem)
except Exception as e:
raise ValueError(f'unanticipated failure with {ok_mem = }') from e
try:
run_python_ulimit(code, bad_mem)
except Exception as e: pass
else: raise ValueError(f'unanticipated success with {bad_mem = }')
print(f'Need `ulimit -v ≥{ok_mem}` to `import numpy`.')
#### Case Study: Attribute Dictionaries
if True:
print(' {} '.format('What is the preferred implementation of `attrdict`?').center(120, '\N{box drawings light horizontal}'))
attrdict_protocols = dedent('''
# attrdict via dispatch to `dict.__*item__` protocols
class attrdict(dict):
__getattr__ = dict.__getitem__
__setattr__ = dict.__setitem__
__delattr__ = dict.__delitem__
''')
attrdict_circular = dedent('''
# attrdict via circular reference on `self.__dict__`
class attrdict(dict):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.__dict__ = self
''')
simple_test = dedent('''
for _ in range(2):
d = attrdict({idx: None for idx in range(100_000)})
''')
sustained_test = dedent('''
for _ in range(100):
d = attrdict({idx: None for idx in range(100_000)})
''')
code = '\n'.join([attrdict_protocols, simple_test])
for prev_run, curr_run in nwise(run_python_ulimit.bisected(0, 2**20)(code)):
pass
attrdict_protocols_ok_mem = max([prev_run.args.vmem, curr_run.args.vmem])
code = '\n'.join([attrdict_circular, simple_test])
for prev_run, curr_run in nwise(run_python_ulimit.bisected(0, 2**20)(code)):
pass
attrdict_circular_ok_mem = max([prev_run.args.vmem, curr_run.args.vmem])
assert isclose(attrdict_protocols_ok_mem, attrdict_circular_ok_mem, abs_tol=1024), 'memory usage in simple test should be close (≤1MiB)'
code = '\n'.join([attrdict_protocols, sustained_test])
try:
run_python_ulimit(code, attrdict_protocols_ok_mem)
except Exception as e:
raise ValueError(f'unanticipated failure with {attrdict_protocols_ok_mem = } in sustained run') from e
code = '\n'.join([attrdict_circular, sustained_test])
try:
run_python_ulimit(code, attrdict_circular_ok_mem)
except Exception as e: pass
else:
raise ValueError(f'unanticipated success with {attrdict_circular_ok_mem = } in sustained run')
print(
'Implementation with circular reference creates memory pressure as a result of cyclic garbage collection delay;',
'fails under sustained test with allocations in tight loop.'
)
#### Case Study: NumPy Memory Leak
if True:
print(' {} '.format("Is there a memory leak with circular reference of `numpy.ndarray` with `dtype='O'`?").center(120, '\N{box drawings light horizontal}'))
core_code = dedent('''
from numpy import array
def create_normal_array(size=100_000):
xs = array([None for _ in range(size)], dtype=object)
ys = array([None for _ in range(size)], dtype=object)
return xs, ys
def create_circular_array(size=100_000):
xs = array([None for _ in range(size)], dtype=object)
ys = array([None for _ in range(size)], dtype=object)
xs[0], ys[0] = ys, xs # circular reference
return xs, ys
''')
baseline_test = dedent('''
for _ in range(2):
xs, ys = create_normal_array()
''')
sustained_test = dedent('''
for _ in range(100):
xs, ys = create_circular_array()
''')
verification_test = dedent('''
for _ in range(100):
xs, ys = create_normal_array()
''')
code = '\n'.join([core_code, baseline_test])
for prev_run, curr_run in nwise(run_python_ulimit.bisected(0, 2**20)(code)):
pass
ok_mem = max([prev_run.args.vmem, curr_run.args.vmem])
code = '\n'.join([core_code, verification_test])
try:
run_python_ulimit(code, ok_mem)
except Exception as e:
raise ValueError(f'unanticipated failure with {attrdict_protocols_ok_mem = } in sustained run') from e
code = '\n'.join([core_code, sustained_test])
try:
run_python_ulimit(code, ok_mem)
except Exception as e: pass
else:
raise ValueError(f'unanticipated success with {attrdict_circular_ok_mem = } in sustained run')
print(
"There is a memory leak with circular references in `numpy.ndarray` with `dtype='0'",
"resulting from missing `tp_traverse` implementation in PyArray_Type",
'(https://github.com/numpy/numpy/blob/17d730ae32f5f60c9c2ca75d202b4e866debd686/numpy/core/src/multiarray/arrayobject.c#L1327)',
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment