Skip to content

Instantly share code, notes, and snippets.

@TimPansino
Created November 30, 2022 23:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save TimPansino/8a3b4cb87d2a28fb2c0b8a1680069fff to your computer and use it in GitHub Desktop.
Save TimPansino/8a3b4cb87d2a28fb2c0b8a1680069fff to your computer and use it in GitHub Desktop.
Python threadsafe WeakValueDictionary and WeakKeyDictionary iteration
"""
This file explores differences in behavior with safely or unsafely iterating on weakref's dictionary types and collections.OrderedDict vs the builtin dictionary type.
Tested with CPython 2.7, CPython 3.7 - CPython 3.11.
Conclusion:
- The dict type can be threadsafe to iterate over keys, values, or items if iterating on a shallow copy (dict.copy())
or by passing the corresponding method to a list or tuple constructor (list(dict.items())).
- collections.OrderedDict on CPython 3.7+ works exactly the same as builtin dict (due to being a subclass). Both guards will prevent issues.
- collections.OrderedDict on CPython 2.7 implements both items() and copy() in a way that directly iterates on itself, causing KeyErrors if sizes change.
Neither guard will work to allow indirect iteration.
- A WeakValueDictionary (and presumably WeakKeyDictionary) is NOT safe to iterate on using any of the above methods,
due to the presence of an internal dict (self.data) that is not guarded using any of these types of iteration guards.
- A WeakValueDictionary is not safe to copy, whether shallow or deep copying, under any circumstances where items may
be added or removed from the dict using normal method calls. (ie. using pop(), not by references being garbage collected)
- A WeakValueDictionary can be safely iterated on ONLY by iterating indirectly on the value references in the internal dict.
(eg. d.valuerefs() or list(d.itervaluerefs()), which are identical)
- Iterating on weakrefs requires manual dereferencing, checking liveness, and retrieving keys (as iteration is done on values not items).
Full example:
for wr in d.valuerefs(): # wr is the weak reference object which must be dereferenced before use
k = wr.key # dict key
v = wr() # potentially dereferenced value
if v is None:
# weakref has been garbage collected and is no longer live. Ignore.
continue
Full CPython implementation of weakref is available here, with the class definition of WeakValueDictionary highlighted.
- https://github.com/python/cpython/blob/5458b7e39eb41b146c650b76e04ac67213138a82/Lib/weakref.py#L92
- Note the use of iteration in copy methods and the use of _IterationGuard to prevent garbage collection causing iteration issues.
"""
import collections
import random
import threading
import weakref
from copy import deepcopy
ITERATION_TIME = 10
DICT_MAX_SIZE = 40
DICT_MIN_SIZE = 20
# DICT_TYPE = dict
# DICT_TYPE = collections.OrderedDict
DICT_TYPE = weakref.WeakValueDictionary
class A(object):
pass
obj_refs = [A() for k in range(DICT_MAX_SIZE)]
d = DICT_TYPE()
for k, v in enumerate(obj_refs):
d[k] = v
def change_dict_size(shutdown):
"""
Cause RuntimeErrors when iterating on the dict by:
- Repeatedly pop and add batches of keys to cause size changes.
- Randomly delete and replace some object refs so the weak references are deleted,
causing the weakref dict to delete them and forcing further size changes.
Comment out either section to test issues independently.
"""
dict_size_change = DICT_MAX_SIZE - DICT_MIN_SIZE
while True:
if shutdown.is_set():
return
# Delete and re-add keys
for i in range(dict_size_change):
d.pop(i, None)
for i in range(dict_size_change):
d[i] = obj_refs[i]
# Randomly replace some obj refs causing the WeakValueDictionary to drop them.
for i in range(len(obj_refs)):
if not random.randint(0, 9):
obj_refs[i] = A()
def iterate_dict_items(shutdown):
"""Direct iteration on .items() method. Crashes due to unguaraded size changes for any type of dict."""
c = 0
while True:
if shutdown.is_set():
return
for k, v in d.items():
c += k
def iterate_dict_list(shutdown):
"""
Indirect iteration on .items() by 'casting' to list.
dict: Appears to never fail due to list constructor not actually iterating on builtin dict types.
WeakValueDictionary add and remove items: Fails due to underlying dict (self.data) changing size during iteration when being consumed by list constructor.
WeakValueDictionary deleted refs: No issue due to internal use of _weakrefset._IterationGuard.
"""
c = 0
while True:
if shutdown.is_set():
return
# for k, v in tuple(d.items()): # Also works
for k, v in list(d.items()):
c += k
def iterate_dict_copy(shutdown):
"""
Indirect iteration on .items() by shallow copying dict first.
dict: Never fails. Not iterating on the same dict so it never changes size, and copy does not iterate.
WeakValueDictionary add and remove items: Fails due to underlying dict (self.data) changing size during copy. __copy__() and copy() both iterate on the underlying dict (self.data) without guards.
WeakValueDictionary deleted refs: No issue due to internal use of _weakrefset._IterationGuard.
"""
c = 0
while True:
if shutdown.is_set():
return
for k, v in d.copy().items():
c += k
def iterate_dict_deep_copy(shutdown):
"""
Indirect iteration on .items() by deep copying dict first.
dict: Never fails. Not iterating on the same dict so it never changes size, and deep copy does not iterate.
WeakValueDictionary add and remove items: Fails due to underlying dict (self.data) changing size during copy. __deepcopy__() iterates on the underlying dict (self.data) without guards.
WeakValueDictionary deleted refs: No issue due to internal use of _weakrefset._IterationGuard.
"""
c = 0
while True:
if shutdown.is_set():
return
for k, v in deepcopy(d).items():
c += k
def iterate_dict_itervaluerefs(shutdown):
"""
WeakValueDictionary only.
Direct iteration on self.data.values().
dict: Method does not exist.
WeakValueDictionary add and remove items: Fails due to directly iterating on a dict that changes size during iteration. Can be fixed with normal guards (list(), tuple(), or copy())
WeakValueDictionary deleted refs: No issue due to internal use of _weakrefset._IterationGuard.
Caveats:
- weakref must be dereferenced by calling it, and checked for None in the case the item has already been garbage collected.
- key is available as weakref.key and must be extracted separately. (This is available due to the WeakValueDictionary using KeyedRef types for weakrefs).
"""
c = 0
while True:
if shutdown.is_set():
return
for wr in d.itervaluerefs():
k = wr.key
v = wr()
if v is None:
# Weakref has been deleted
continue
c += k
def iterate_dict_valuerefs(shutdown):
"""
WeakValueDictionary only.
Indirect iteration on self.data.values() by deep copying dict first.
dict: Method does not exist.
WeakValueDictionary add and remove items: No issues due to indirect iteration through list(self.data.values()) which guards against any size changes during iteration.
WeakValueDictionary deleted refs: Same as above.
NOTE: This is equivalent to list(d.itervaluerefs()) under the hood, and both will work identically. An alternative for loop with that implementation is below but commented out.
Caveats:
- weakref must be dereferenced by calling it, and checked for None in the case the item has already been garbage collected.
- key is available as weakref.key and must be extracted separately. (This is available due to the WeakValueDictionary using KeyedRef types for weakrefs).
"""
c = 0
while True:
if shutdown.is_set():
return
# for wr in list(d.itervaluerefs()): # Also works, as well as tuple(d.itervaluerefs())
for wr in d.valuerefs():
k = wr.key
v = wr()
if v is None:
# Weakref has been deleted
continue
c += k
def main():
shutdown = threading.Event()
t1 = threading.Thread(target=change_dict_size, args=(shutdown,))
t1.start()
# t2 = threading.Thread(target=iterate_dict_items, args=(shutdown,))
# t2 = threading.Thread(target=iterate_dict_list, args=(shutdown,))
# t2 = threading.Thread(target=iterate_dict_copy, args=(shutdown,))
# t2 = threading.Thread(target=iterate_dict_deep_copy, args=(shutdown,))
# t2 = threading.Thread(target=iterate_dict_itervaluerefs, args=(shutdown,))
t2 = threading.Thread(target=iterate_dict_valuerefs, args=(shutdown,))
t2.start()
t2.join(timeout=ITERATION_TIME) # Run for ITERATION_TIME seconds, then shutdown
shutdown.set()
t1.join()
t2.join()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment