Created
February 11, 2023 14:11
-
-
Save Yiling-J/8505fa0d91df7b7d4d4b33c931cf28e8 to your computer and use it in GitHub Desktop.
cached_property
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from functools import cached_property | |
from threading import Thread | |
from random import randint | |
import threading | |
import time | |
_NOT_FOUND = object() | |
class cached_property_new(cached_property): | |
def __get__(self, instance, owner=None): | |
if instance is None: | |
return self | |
if self.attrname is None: | |
raise TypeError( | |
"Cannot use cached_property instance without calling __set_name__ on it." | |
) | |
try: | |
cache = instance.__dict__ | |
except AttributeError: # not all objects have __dict__ (e.g. class defines slots) | |
msg = ( | |
f"No '__dict__' attribute on {type(instance).__name__!r} " | |
f"instance to cache {self.attrname!r} property." | |
) | |
raise TypeError(msg) from None | |
val = cache.get(self.attrname, _NOT_FOUND) | |
if val is _NOT_FOUND: | |
sentinel = threading.Event() | |
event = cache.setdefault(f"__{self.attrname}_event", sentinel) | |
if event is sentinel: | |
val = self.func(instance) | |
try: | |
cache[self.attrname] = val | |
event.set() | |
except TypeError: | |
msg = ( | |
f"The '__dict__' attribute on {type(instance).__name__!r} instance " | |
f"does not support item assignment for caching {self.attrname!r} property." | |
) | |
raise TypeError(msg) from None | |
finally: | |
cache.pop(f"__{self.attrname}_event") | |
else: | |
event.wait() | |
val = cache[self.attrname] | |
return val | |
class Spam: | |
@cached_property_new | |
def ham(self): | |
print(f"Calculating amount of ham in {self}") | |
time.sleep(10) | |
return randint(0, 100) | |
def get_ham(self): | |
print(f"The amount of ham in is {self.ham}") | |
def bacon(): | |
spam = Spam() | |
print(f"The amount of ham in {spam} is {spam.ham}") | |
print(f"The amount of ham in {spam} is {spam.ham} again") | |
start = time.time() | |
threads = [] | |
for _ in range(3): | |
t = Thread(target=bacon) | |
threads.append(t) | |
t.start() | |
for t in threads: | |
t.join() | |
spam = Spam() | |
threads = [] | |
for _ in range(100): | |
t = Thread(target=spam.get_ham) | |
threads.append(t) | |
t.start() | |
for t in threads: | |
t.join() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment