-
-
Save gst/63cfa3f7b757a1f1e7c22fe4a231c470 to your computer and use it in GitHub Desktop.
cached_property
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from functools import cached_property | |
from threading import Thread | |
from random import randint | |
import threading | |
import time | |
_NOT_FOUND = object() | |
class cached_property_new(cached_property): | |
def __get__(self, instance, owner=None): | |
if instance is None: | |
return self | |
if self.attrname is None: | |
raise TypeError( | |
"Cannot use cached_property instance without calling __set_name__ on it." | |
) | |
try: | |
cache = instance.__dict__ | |
except AttributeError: # not all objects have __dict__ (e.g. class defines slots) | |
msg = ( | |
f"No '__dict__' attribute on {type(instance).__name__!r} " | |
f"instance to cache {self.attrname!r} property." | |
) | |
raise TypeError(msg) from None | |
val = cache.get(self.attrname, _NOT_FOUND) | |
if val is not _NOT_FOUND: | |
return val | |
val = cache.get(f"__{self.attrname}_result", None) | |
if val is not None: | |
isok, val = val | |
else: | |
sentinel = threading.Event() | |
thread, event = cache.setdefault(f"__{self.attrname}_event", (threading.current_thread(), sentinel)) | |
def readonly(): | |
msg = ( | |
f"The '__dict__' attribute on {type(instance).__name__!r} instance " | |
f"does not support item assignment for caching {self.attrname!r} property." | |
) | |
raise TypeError(msg) from None | |
if event is sentinel: | |
has_err = True | |
try: | |
val = cache.get(self.attrname, _NOT_FOUND) | |
if val is not _NOT_FOUND: | |
return val | |
val = cache.get(f"__{self.attrname}_result", None) | |
if val is not None: | |
isok, val = val | |
assert not isok | |
raise val | |
try: | |
cache[f"__{self.attrname}_result"] = None | |
except TypeError as err: | |
has_err = err | |
readonly() | |
else: | |
try: | |
val = self.func(instance) | |
except BaseException as err: | |
has_err = err | |
cache[f"__{self.attrname}_result"] = (False, err) | |
raise | |
else: | |
cache[f"__{self.attrname}_result"] = (True, val) | |
cache[self.attrname] = val | |
has_err = None | |
return val | |
finally: | |
cache.pop(f"__{self.attrname}_event") | |
event.set() | |
if has_err is None: | |
cache.pop(f"__{self.attrname}_result") | |
else: | |
if thread is threading.current_thread(): | |
return self.func(instance) | |
event.wait() | |
val = cache.get(f"__{self.attrname}_result", None) | |
if val is None: | |
val = cache.get(self.attrname, _NOT_FOUND) | |
if val is _NOT_FOUND: | |
readonly() | |
return val | |
isok, val = val | |
if not isok: | |
raise val from None | |
return val | |
class Spam: | |
@cached_property_new | |
def ham(self): | |
print(f"Calculating amount of ham in {self}") | |
time.sleep(10) | |
return randint(0, 100) | |
def get_ham(self): | |
print(f"The amount of ham in is {self.ham}") | |
def bacon(): | |
spam = Spam() | |
print(f"The amount of ham in {spam} is {spam.ham}") | |
print(f"The amount of ham in {spam} is {spam.ham} again") | |
def main(): | |
start = time.time() | |
threads = [] | |
for _ in range(3): | |
t = Thread(target=bacon) | |
threads.append(t) | |
t.start() | |
for t in threads: | |
t.join() | |
spam = Spam() | |
threads = [] | |
for _ in range(8): | |
t = Thread(target=spam.get_ham) | |
threads.append(t) | |
t.start() | |
for t in threads: | |
t.join() | |
if __name__ == "__main__": | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment