Skip to content

Instantly share code, notes, and snippets.

@piroyoung
Last active January 17, 2018 16:14
Show Gist options
  • Save piroyoung/a012f527ba92bb2d439300039fff893c to your computer and use it in GitHub Desktop.
Save piroyoung/a012f527ba92bb2d439300039fff893c to your computer and use it in GitHub Desktop.
定期的にリークしてそうなオブジェクトを殺すマン.別スレッドでインスタンス作ってqueueに積む実装にするとなお良さそう
import threading
import contextlib
class LeakObject:
def __init__(self, lifetime: int, cls: object, **kwargs):
self._counter = 0
self.lifetime = lifetime
self._kwargs = kwargs
self._cls = cls
self._obj = cls(**kwargs)
self._lock = threading.Lock()
@contextlib.contextmanager
def instance(self):
with self._lock:
self._counter += 1
yield self._obj
if self._counter > self.lifetime:
with self._lock:
self._counter = 0
self._obj = self._cls(**self._kwargs)
class A():
def __init__(self):
print('generated')
self.dat = []
def append(self):
self.dat.append(1)
def hello(self):
return 'hello'
l = LeakObject(8, A)
with l.instance() as a:
a.append()
print(a.dat)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment