Skip to content

Instantly share code, notes, and snippets.

@mvantellingen mvantellingen/cache.py
Last active Mar 15, 2018

Embed
What would you like to do?
Wagtail StreamField Optimized
import inspect
import logging
from collections import defaultdict
from wagtail.core import blocks
logger = logging.getLogger(__name__)
class _FieldCacheManager():
def __init__(self):
self._cache = {}
def get(self, cls, value):
if cls in self._cache:
return self._cache[cls].get(value)
def preload(self, block, data):
items = self._analyze_data(block, data)
self._preload_data(items)
def _analyze_data(self, block, data):
result = defaultdict(set)
for block_data in data:
child_result = self._analyze_data_item(block, block_data)
self._merge_dict_sets(result, child_result)
return result
def _analyze_data_item(self, block, data):
result = defaultdict(set)
for name, child_block in block.child_blocks.items():
value = data.get(name, child_block.get_default())
if hasattr(child_block, 'child_block'):
for val in value:
child_result = self._analyze_data_item(child_block.child_block, val)
self._merge_dict_sets(result, child_result)
elif hasattr(child_block, 'child_blocks'):
child_result = self._analyze_data_item(child_block, value)
self._merge_dict_sets(result, child_result)
elif isinstance(child_block, blocks.ChooserBlock) and value:
result[child_block.__class__].add(value)
return result
def _preload_data(self, data):
for cls, values in data.items():
obj = cls()
self._cache[cls] = obj.target_model.objects.in_bulk(values)
def _merge_dict_sets(self, target, other):
for key, value in other.items():
target[key].update(value)
class CacheStructBlock(blocks.StructBlock):
def bulk_to_python(self, value):
cm = _FieldCacheManager()
cm.preload(self, list(value))
return [self.to_python(val, cm) for val in value]
def to_python(self, value, cache_manager=None):
result = []
for name, child_block in self.child_blocks.items():
if name not in value:
# NB the result of get_default is NOT passed through to_python,
# as it's expected to be in the block's native type already
result.append((name, child_block.get_default()))
continue
child_val = value[name]
if not cache_manager:
result.append((name, child_block.to_python(child_val)))
continue
# If the child_block expects the cache_manager to be passed then
# use that by default. Otherwise we try to do the relevant
# conversion here if that is possible.
argspec = inspect.getargspec(child_block.to_python)
if len(argspec.args) > 2:
python_value = child_block.to_python(child_val, cache_manager)
elif isinstance(child_block, blocks.ChooserBlock):
python_value = cache_manager.get(child_block.__class__, child_val)
elif isinstance(child_block, blocks.ListBlock):
argspec = inspect.getargspec(child_block.child_block.to_python)
if len(argspec.args) > 2:
python_value = [
child_block.child_block.to_python(item, cache_manager)
for item in child_val
]
else:
logger.debug("Unable to use optimized call for %s", child_block)
python_value = [
child_block.child_block.to_python(item)
for item in child_val
]
elif isinstance(child_block, blocks.StructBlock):
logger.debug("Unable to use optimized call for %s", child_block)
python_value = child_block.to_python(child_val)
else:
python_value = child_block.to_python(child_val)
result.append((name, python_value))
return blocks.StructValue(self, result)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.