Created
March 4, 2020 14:05
-
-
Save M0r13n/0618dd7aca5c0fb34bfce43f6760532b to your computer and use it in GitHub Desktop.
A simple self cleaning Python dictionary (dict) with timestamps.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
class SelfCleaningDict(dict): | |
def __init__(self, time_out=3600, prone_interval=None): | |
self.timeout = time_out | |
self.prone_interval = time_out // 10 or prone_interval | |
self.last_prone = None | |
if not self.timeout: | |
raise ValueError("Timeout cannot be None") | |
super().__init__() | |
def _prone(self): | |
"""> | |
Remove all items that timed out. | |
Keys are ordered by last change. | |
So we iterate over the first n keys, removing timed out values, until we hit the first still active item. | |
Considerations: | |
For many items it might make sense to not use list(self.items) | |
and rather use a generator to only iterate over the first x items. | |
But this would require some (but not much) more effort if done efficiently. | |
""" | |
cur = time.time() | |
for key, value in list(self.items()): | |
if (value[1] + self.timeout) < cur: | |
del self[key] | |
else: | |
break | |
self.last_prone = cur | |
def __setitem__(self, key, value): | |
cur = time.time() | |
v = (value, cur) | |
super().__setitem__(key, v) | |
if self.prone_interval and self.last_prone and cur < self.last_prone + self.prone_interval: | |
self._prone() | |
def __getitem__(self, key): | |
v = super().__getitem__(key) | |
if v: | |
v = v[0] | |
return v | |
def __delitem__(self, key): | |
super().__delitem__(key) | |
if __name__ == "__main__": | |
c = SelfCleaningDict(time_out=1) | |
c['a'] = 1 | |
c['b'] = 2 | |
c['c'] = 3 | |
print(c, c.timeout, c['a']) | |
import timeit | |
import random, string | |
import cProfile | |
k_v_pairs = [(x, ''.join(random.choice(string.ascii_uppercase) for _ in range(20))) for x in range(1000)] | |
d = dict() | |
def normal(): | |
for k, v in k_v_pairs: | |
d[k] = v | |
d.clear() | |
return d | |
def new(): | |
for k, v in k_v_pairs: | |
d[k] = v | |
d.clear() | |
return d | |
print("NORMAL: ", timeit.timeit(normal, number=1000)) | |
d = SelfCleaningDict() | |
print("NEW: ", timeit.timeit(new, number=1000)) | |
# for 1000 items the new method is ~ 10 times slower | |
pr = cProfile.Profile() | |
pr.enable() | |
timeit.timeit(new, number=1000) | |
pr.disable() | |
pr.print_stats(sort='time') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment