Created
July 30, 2012 16:32
-
-
Save anonymous/3208245 to your computer and use it in GitHub Desktop.
Module for the finalization of weakrefable objects
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
Module for the finalization of weakrefable objects | |
finalize(obj, func, *args, **kwds) creates a callable finalizer object | |
for obj which can be called explicitly, or automatically when obj is | |
garbage collected. For example | |
>>> class Kenny: pass | |
... | |
>>> kenny = Kenny() | |
>>> finalize(kenny, print, "you killed kenny!") | |
<finalize.finalize object at 0xffed4f2c> | |
>>> del kenny | |
you killed kenny! | |
Calling the finalizer for the first time will invoke | |
func(*args, **kwds) and return the result. Calling it a second | |
time has no effect and simply returns None. | |
>>> def callback(): | |
... print("CALLBACK") | |
... return 123 | |
... | |
>>> kenny = Kenny() | |
>>> f = finalize(kenny, callback) | |
>>> assert f() is 123 | |
CALLBACK | |
>>> f() # no effect because finalizer already called | |
>>> del kenny # no effect because finalizer already called | |
At program shutdown, any remaining finalizers for which the atexit | |
attribute is true will be called in reverse order of creation. By | |
default the atexit attribute is false. | |
>>> kenny1 = Kenny() | |
>>> f1 = finalize(kenny1, print, "kenny1") | |
>>> kenny2 = Kenny() | |
>>> f2 = finalize(kenny2, print, "kenny2") | |
>>> f2.atexit = True # ensure callback invoked before exiting | |
>>> exit() | |
kenny2 | |
No finalizer callbacks are ever called after the functions registered | |
with the atexit module. This avoid problems with invoking callbacks | |
later on during interpreter shutdown. | |
''' | |
import sys | |
import weakref | |
import atexit | |
class finalize(object): | |
''' | |
Class for registering a callback to be called at garbage collection | |
''' | |
__slots__ = ('atexit', '_sortkey') | |
_registry = {} | |
_shutdown = False | |
_count = 0 | |
def __init__(self, obj, func, *args, **kwds): | |
self.atexit = False | |
self._sortkey = finalize._count | |
finalize._count += 1 | |
if obj is None: | |
wr = None | |
else: | |
wr = weakref.ref(obj, self._make_wrapper()) | |
self._registry[self] = (wr, func, args, kwds) | |
def __call__(self): | |
'''Invoke func(*args, **kwds) unless previously invoked''' | |
tmp = self._registry.pop(self, None) | |
if tmp and not self._shutdown: | |
_, func, args, kwds = tmp | |
return func(*args, **kwds) | |
def _make_wrapper(self): | |
'''Return a wrapper for self which can be used as weakref callback''' | |
def wrapper(wr): | |
try: | |
self() | |
except: | |
sys.stderr.write('Ignored exception in finalizer - ') | |
_print_stripped_exc(2) | |
return wrapper | |
# Print current exception with some frames stripped from traceback | |
def _print_stripped_exc(stripcount): | |
t, v, tb = sys.exc_info() | |
for i in range(stripcount): | |
if tb is None: | |
break | |
tb = tb.tb_next | |
if hasattr(v, '__traceback__'): | |
v.__traceback__ = tb | |
sys.excepthook(t, v, tb) | |
# At shutdown invoke finalizers for which atexit is true | |
def _atexit_callback(): | |
try: | |
L = [f for f in finalize._registry.keys() if f.atexit] | |
L.sort(key=lambda f : f._sortkey, reverse=True) | |
for f in L: | |
f._make_wrapper()(None) | |
finally: | |
# prevent any more finalizers from executing during shutdown | |
finalize._shutdown = True | |
finalize._registry.clear() | |
atexit.register(_atexit_callback) | |
if __name__ == '__main__': | |
class A: | |
pass | |
def foo(): | |
bar() | |
def bar(): | |
1/0 | |
L = [] | |
a = A() | |
finalize(a, L.append, "hello 1") | |
f2 = finalize(a, L.append, "hello 2") | |
f3 = finalize(a, L.append, "hello 3") | |
finalize(a, foo) | |
L.append("111") | |
f3() # => L.append("hello 3") | |
L.append("222") | |
f3() # nothing because previously called | |
L.append("333") | |
print("[[[You should see a traceback for an ignored ZeroDivisionError:]]]") | |
del a # => print traceback for ZeroDivisionError | |
# => L.append("hello 2") | |
# => L.append("hello 1") | |
L.append("444") | |
f2() # nothing because previously called | |
print(L) | |
assert L == [ | |
'111', | |
'hello 3', | |
'222', | |
'333', | |
'hello 2', | |
'hello 1', | |
'444' | |
] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment