All of the code necessary to implement and test protobuf projection in a Google Appengine web application.
import db_util | |
db_util.enable_db_protobuf_projection() | |
db_util.enable_ndb_protobuf_projection() |
# This is really a list, but we don't have a ThreadLocalList class. | |
current_protobuf_projection = thread_util.ThreadLocalDict() | |
@contextlib.contextmanager | |
def protobuf_projection(cls, property_names): | |
# Update the thread-local with the current projected properties | |
current_protobuf_projection.update(dict.fromkeys(property_names)) | |
# Monkey-patch this class's init to keep track of the projected property | |
# names. This monkey-patch only is reverted before leaving the context. | |
# Note this is only for db.Model classes. ndb.Model classes have a | |
# different approach to init, and we have to set this attribute in the | |
# _projected_pb_to_entity function, below. | |
if issubclass(cls, db.Model): | |
orig_init = cls.__init__ | |
def new_init(self, *args, **kwargs): | |
self._protobuf_projection_properties = property_names | |
return orig_init(self, *args, **kwargs) | |
cls.__init__ = new_init | |
try: | |
yield | |
finally: | |
if issubclass(cls, db.Model): | |
cls.__init__ = orig_init | |
current_protobuf_projection.clear() | |
def _create_projected_pb_to_entity(orig_pb_to_entity): | |
"""Removes properties from protobufs for faster queries. | |
Retrieving data from the GAE datastore happens in two stages: | |
1. Issuing an RPC to the datastore API, which returns a message in | |
protobuf format. | |
2. Translating the protobuf message into an instance of a db.Model | |
or a ndb.Model class. | |
The second stage is considerably slower than the first stage, primarily due | |
to a lot of validation that makes sure the raw data returned matches the | |
schema defined in the *.Model classes. This validation happens for *all* | |
of the properties defined by the *.Model class, regardless if you actually | |
care to read any of those properties, later. | |
So, to speed up the second stage, we remove the unnecessary properties from | |
the protobuf before entering the second stage. This is much like the | |
scenario where a new property is added to a Model. Entities that have not | |
been written since the new property was introduced will return a protobuf | |
that excludes the newly introduced property. | |
NOTE: Reading an attribute excluded from the projection from an entity | |
created during protobuf projection will raise an AttributeError. | |
NOTE: Trying to put() an entity created during protobuf projection will | |
raise an IOError. | |
""" | |
def _projected_pb_to_entity(self, pb): | |
if current_protobuf_projection: | |
def del_undesired_properties(lst): | |
for i in xrange(len(lst) - 1, -1, -1): | |
if lst[i].name() not in current_protobuf_projection.keys(): | |
del lst[i] | |
if pb.property_size() > 0: | |
del_undesired_properties(pb.property_list()) | |
if pb.raw_property_size() > 0: | |
del_undesired_properties(pb.raw_property_list()) | |
entity = orig_pb_to_entity(self, pb) | |
# Keep track of which properties were projected when this instance | |
# was created. For db.Model instances, we use a monkey-patched init | |
# to keep track of these properties, which is done in | |
# protobuf_projection(), above. | |
if current_protobuf_projection and isinstance(entity, ndb.Model): | |
entity._protobuf_projection_properties = ( | |
current_protobuf_projection.keys()) | |
return entity | |
return _projected_pb_to_entity | |
_orig_db_pb_to_entity = datastore.DatastoreAdapter.pb_to_entity | |
def _protobuf_projection_mock_getattribute(cls): | |
"""Prevent access to properties excluded from a projection query. | |
Monkey-patch this class's getattribute to raise an exception when | |
trying to access an unavailable property on an instance that was created | |
within a protobuf_projection context. | |
This monkey-patch for this function is always in effect, but it only | |
applies to objects that were created within the protobuf_projection | |
context. Those are the only objects that will have a | |
_protobuf_projection_properties attribute available. The code inside | |
this function must execute extremely fast, because it is called A LOT. | |
""" | |
orig_get_attribute = cls.__getattribute__ | |
projected_attribute_error_msg = ("Property unavailable due to protobuf " | |
"projection.") | |
projected_attribute_error = AttributeError(projected_attribute_error_msg) | |
def new_get_attribute(self, name): | |
try: | |
# Don't call hasattr(), since that'd be infinite recursion | |
projected_properties = super(cls, self).__getattribute__( | |
'_protobuf_projection_properties') | |
all_properties = super(cls, self).__getattribute__( | |
'_properties').keys() | |
if name in all_properties and name not in projected_properties: | |
raise projected_attribute_error | |
except AttributeError, ae: | |
if ae.message == projected_attribute_error_msg: | |
raise | |
else: | |
# This instances wasn't fetched during protobuf projection, | |
# since _protobuf_projection_properties is unavailable | |
pass | |
return orig_get_attribute(self, name) | |
cls.__getattribute__ = new_get_attribute | |
_protobuf_projection_disallowed_put_exception = IOError( | |
'Write disallowed due to protobuf projection.') | |
def _protobuf_projection_disallow_ndb_put(): | |
# Monkey-patch the various put() functions to raise an exception when | |
# trying to put an entity that was created within a protobuf_projection | |
# context. | |
def do_not_allow_puts(self, orig_func, *args, **kwargs): | |
if hasattr(self, '_protobuf_projection_properties'): | |
raise _protobuf_projection_disallowed_put_exception | |
return orig_func(self, *args, **kwargs) | |
orig_put = ndb.Model.put | |
ndb.Model.put = lambda self, *args, **kwargs: do_not_allow_puts(self, | |
orig_put, *args, **kwargs) | |
orig_put_async = ndb.Model.put_async | |
ndb.Model.put_async = lambda self, *args, **kwargs: do_not_allow_puts(self, | |
orig_put_async, *args, **kwargs) | |
def _protobuf_projection_disallow_db_put(): | |
orig_db_put_async = db.put_async | |
def new_db_put_async(models, **kwargs): | |
if not isinstance(models, (list, tuple)): | |
models = (models,) | |
if any([hasattr(m, '_protobuf_projection_properties') | |
for m in models]): | |
raise _protobuf_projection_disallowed_put_exception | |
return orig_db_put_async(models, **kwargs) | |
db.put_async = new_db_put_async | |
orig_model_put = db.Model.put | |
def new_db_model_put(self, *args, **kwargs): | |
if hasattr(self, '_protobuf_projection_properties'): | |
raise _protobuf_projection_disallowed_put_exception | |
return orig_model_put(self, *args, **kwargs) | |
db.Model.put = new_db_model_put | |
def enable_db_protobuf_projection(): | |
_protobuf_projection_mock_getattribute(db.Model) | |
_protobuf_projection_disallow_db_put() | |
datastore.DatastoreAdapter.pb_to_entity = _create_projected_pb_to_entity( | |
_orig_db_pb_to_entity) | |
_orig_ndb_pb_to_entity = ndb_model.ModelAdapter.pb_to_entity | |
def enable_ndb_protobuf_projection(): | |
_protobuf_projection_mock_getattribute(ndb.Model) | |
_protobuf_projection_disallow_ndb_put() | |
ndb_model.ModelAdapter.pb_to_entity = _create_projected_pb_to_entity( | |
_orig_ndb_pb_to_entity) |
class ProjectedProtobufQueryTest(gae_model.GAEModelTestCase): | |
def setUp(self): | |
super(ProjectedProtobufQueryTest, self).setUp() | |
db_util.enable_db_protobuf_projection() | |
db_util.enable_ndb_protobuf_projection() | |
@db_util.disable_ndb_memcache | |
class TestNDBModel(ndb.Model): | |
prop_a = ndb.TextProperty(indexed=False) | |
prop_b = ndb.IntegerProperty(indexed=True, default=-1) | |
self.test_ndb_class = TestNDBModel | |
class TestDBModel(db.Model): | |
prop_a = db.TextProperty(indexed=False) | |
prop_b = db.IntegerProperty(indexed=True, default=-1) | |
self.test_db_class = TestDBModel | |
def test_ndb_simple(self): | |
obj = self.test_ndb_class(prop_a="Hello world", prop_b=3) | |
key = obj.put() | |
# Get by key | |
with db_util.protobuf_projection(self.test_ndb_class, ['prop_a']): | |
obj = key.get(use_cache=False) | |
self.assertEqual(obj.prop_a, "Hello world") | |
# Get a different projection by key | |
with db_util.protobuf_projection(self.test_ndb_class, ['prop_b']): | |
obj = key.get(use_cache=False) | |
self.assertEqual(obj.prop_b, 3) | |
# Get a different projection by key | |
with db_util.protobuf_projection(self.test_ndb_class, | |
['prop_a', 'prop_b']): | |
obj = key.get(use_cache=False) | |
self.assertEqual(obj.prop_a, "Hello world") | |
self.assertEqual(obj.prop_b, 3) | |
# Get by query | |
with db_util.protobuf_projection(self.test_ndb_class, ['prop_b']): | |
obj = self.test_ndb_class.query().filter( | |
self.test_ndb_class.prop_b == 3).get(use_cache=False) | |
self.assertEqual(obj.prop_b, 3) | |
def test_ndb_put_disallowed(self): | |
obj = self.test_ndb_class(prop_a="Hello world", prop_b=3) | |
key = obj.put() | |
def call_put(): | |
obj.put() | |
def call_put_async(): | |
obj.put_async() | |
def call_global_put(): | |
ndb.put_multi([obj]) | |
with db_util.protobuf_projection(self.test_ndb_class, ['prop_a']): | |
obj = key.get(use_cache=False) | |
# Calling put() on an instance created during protobuf projection | |
# should always raise an IOError | |
self.assertRaises(IOError, call_put) | |
self.assertRaises(IOError, call_put_async) | |
self.assertRaises(IOError, call_global_put) | |
# But, calling put on an instance created by a regular datastore | |
# read should put() correctly. | |
obj = key.get(use_cache=False) | |
try: | |
call_put() | |
call_put_async() | |
call_global_put() | |
except IOError: | |
self.fail('IOError raised incorrectly.') | |
def test_ndb_excluded_attribute_reading_disallowed(self): | |
obj = self.test_ndb_class(prop_a="Hello world", prop_b=3) | |
key = obj.put() | |
# Reading an excluded property from an instance created during | |
# projection should always raise an AttributeError | |
with db_util.protobuf_projection(self.test_ndb_class, ['prop_a']): | |
obj = key.get(use_cache=False) | |
self.assertRaises(AttributeError, lambda: obj.prop_b) | |
self.assertRaises(AttributeError, lambda: obj.prop_b) | |
# But reading an excluded property should be fine during regular | |
# datastore reads. | |
obj = key.get(use_cache=False) | |
try: | |
obj.prop_b | |
except AttributeError: | |
self.fail('AttributeError raised incorrectly.') | |
def test_db_simple(self): | |
obj = self.test_db_class(prop_a="Hello world", prop_b=3) | |
key = obj.put() | |
# Get by key | |
with db_util.protobuf_projection(self.test_db_class, ['prop_a']): | |
obj = db.get(key) | |
self.assertEqual(obj.prop_a, "Hello world") | |
# Get a different projection by key | |
with db_util.protobuf_projection(self.test_db_class, ['prop_b']): | |
obj = db.get(key) | |
self.assertEqual(obj.prop_b, 3) | |
# Get a different projection by key | |
with db_util.protobuf_projection(self.test_db_class, | |
['prop_a', 'prop_b']): | |
obj = db.get(key) | |
self.assertEqual(obj.prop_a, "Hello world") | |
self.assertEqual(obj.prop_b, 3) | |
# Get by query | |
with db_util.protobuf_projection(self.test_db_class, ['prop_b']): | |
obj = self.test_db_class.all().filter( | |
'prop_b =', 3).get() | |
self.assertEqual(obj.prop_b, 3) | |
def test_db_put_disallowed(self): | |
obj = self.test_db_class(prop_a="Hello world", prop_b=3) | |
key = obj.put() | |
def call_put(): | |
obj.put() | |
def call_global_put(): | |
db.put(obj) | |
db.put([obj]) | |
with db_util.protobuf_projection(self.test_db_class, ['prop_a']): | |
obj = db.get(key) | |
# Calling put() on an instance created during protobuf projection | |
# should always raise an IOError | |
self.assertRaises(IOError, call_put) | |
self.assertRaises(IOError, call_global_put) | |
# But, calling put on an instance created by a regular datastore | |
# read should put() correctly. | |
obj = db.get(key) | |
try: | |
call_put() | |
call_global_put() | |
except IOError: | |
self.fail('IOError raised incorrectly.') | |
def test_db_excluded_attribute_reading_disallowed(self): | |
obj = self.test_db_class(prop_a="Hello world", prop_b=3) | |
key = obj.put() | |
# Reading an excluded property from an instance created during | |
# projection should always raise an AttributeError | |
with db_util.protobuf_projection(self.test_db_class, ['prop_a']): | |
obj = db.get(key) | |
self.assertRaises(AttributeError, lambda: obj.prop_b) | |
self.assertRaises(AttributeError, lambda: obj.prop_b) | |
# But reading an excluded property should be fine during regular | |
# datastore reads. | |
obj = db.get(key) | |
try: | |
obj.prop_b | |
except AttributeError: | |
self.fail('AttributeError raised incorrectly.') |
projection = ['user_email', 'username', 'user_nickname', 'user_id', | |
'is_phantom', 'birthdate', 'may_be_child', 'restricted_domain', | |
'child_capabilities'] | |
with db_util.protobuf_projection(user_models.UserData, projection): | |
students_data = user_models.UserData.all().fetch(50) |
"""Utility functions to help write thread-safe code. | |
The main advantage of this code is that it knows whether the | |
dev-appserver is being run with 'threadsafe: true' or 'threadsafe: | |
false', and if it's false, it uses more efficient, but less | |
thread-safe, variants. (This 'knowing' is actually enforced in the | |
unittest.) | |
Therefore, don't use these methods if you are spawning threads | |
yourself; these are meant to be used for 'generic' GAE objects that do | |
not use threads unless we are using a threaded version of our webapp. | |
NOTE: This is imported by appengine_config.py, which is very | |
restricted in what it's safe to import. Only import python system | |
libs from this file. | |
""" | |
import os | |
import threading | |
import modules_util | |
_USES_THREADSAFE = modules_util.module_yaml(full_parse=False).get( | |
'threadsafe', False) | |
if _USES_THREADSAFE: | |
import UserDict | |
import threading | |
_THREAD_LOCAL_SUPER = threading.local | |
# Sadly, it's illegal to inherit from two C classes, and | |
# threading.local and dict are both written in C. So we have to | |
# use UserDict instead of dict to make ThreadLocalDict() work. | |
_DICT_SUPER = UserDict.UserDict | |
RLock = threading.RLock | |
else: | |
# No need for threadsafety if we don't see threadsafe: true in app.yaml. | |
_THREAD_LOCAL_SUPER = object | |
_DICT_SUPER = dict | |
# I have to do this context-manager the old-fashioned way since | |
# we do collections.defaultdict(threading.RLock) in cacheutil.py. | |
class RLock(object): | |
"""A no-op version of threading.RLock; can only be used with 'with'.""" | |
def __enter__(self): | |
pass | |
def __exit__(self, *args): | |
pass | |
class ThreadLocal(_THREAD_LOCAL_SUPER): | |
"""Equivalent to threading.local. | |
You can use this in two ways: make an instance of it to get a | |
thread-local dummy-object, or subclass it to get a thread-local | |
smartie-object. | |
Usage: | |
class MyClass(thread_util.ThreadLocal): | |
... | |
my_instance = MyClass() # note this is at the global level | |
my_random_container = thread_util.ThreadLocal() | |
Now my_instance will have a different version of MyClass() in every | |
thread. my_random_container will also be different in every thread. | |
The reason to use this instead of threading.local directly is that | |
we revert to 'object' when dev-appserver is run in non-threaded mode, | |
so you don't pay the cost of thread-local-ness if you don't have to. | |
""" | |
pass | |
class ThreadLocalDict(_DICT_SUPER, _THREAD_LOCAL_SUPER): | |
"""A threadsafe dict: each thread has its own version of it.""" | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment