Skip to content

Instantly share code, notes, and snippets.

@MattFaus
Last active August 3, 2018 12:28
Show Gist options
  • Save MattFaus/bd8c0a1a4fb3d7cc3fe3 to your computer and use it in GitHub Desktop.
Save MattFaus/bd8c0a1a4fb3d7cc3fe3 to your computer and use it in GitHub Desktop.
All of the code necessary to implement and test protobuf projection in a Google Appengine web application.
import db_util
db_util.enable_db_protobuf_projection()
db_util.enable_ndb_protobuf_projection()
# This is really a list, but we don't have a ThreadLocalList class.
current_protobuf_projection = thread_util.ThreadLocalDict()
@contextlib.contextmanager
def protobuf_projection(cls, property_names):
# Update the thread-local with the current projected properties
current_protobuf_projection.update(dict.fromkeys(property_names))
# Monkey-patch this class's init to keep track of the projected property
# names. This monkey-patch only is reverted before leaving the context.
# Note this is only for db.Model classes. ndb.Model classes have a
# different approach to init, and we have to set this attribute in the
# _projected_pb_to_entity function, below.
if issubclass(cls, db.Model):
orig_init = cls.__init__
def new_init(self, *args, **kwargs):
self._protobuf_projection_properties = property_names
return orig_init(self, *args, **kwargs)
cls.__init__ = new_init
try:
yield
finally:
if issubclass(cls, db.Model):
cls.__init__ = orig_init
current_protobuf_projection.clear()
def _create_projected_pb_to_entity(orig_pb_to_entity):
"""Removes properties from protobufs for faster queries.
Retrieving data from the GAE datastore happens in two stages:
1. Issuing an RPC to the datastore API, which returns a message in
protobuf format.
2. Translating the protobuf message into an instance of a db.Model
or a ndb.Model class.
The second stage is considerably slower than the first stage, primarily due
to a lot of validation that makes sure the raw data returned matches the
schema defined in the *.Model classes. This validation happens for *all*
of the properties defined by the *.Model class, regardless if you actually
care to read any of those properties, later.
So, to speed up the second stage, we remove the unnecessary properties from
the protobuf before entering the second stage. This is much like the
scenario where a new property is added to a Model. Entities that have not
been written since the new property was introduced will return a protobuf
that excludes the newly introduced property.
NOTE: Reading an attribute excluded from the projection from an entity
created during protobuf projection will raise an AttributeError.
NOTE: Trying to put() an entity created during protobuf projection will
raise an IOError.
"""
def _projected_pb_to_entity(self, pb):
if current_protobuf_projection:
def del_undesired_properties(lst):
for i in xrange(len(lst) - 1, -1, -1):
if lst[i].name() not in current_protobuf_projection.keys():
del lst[i]
if pb.property_size() > 0:
del_undesired_properties(pb.property_list())
if pb.raw_property_size() > 0:
del_undesired_properties(pb.raw_property_list())
entity = orig_pb_to_entity(self, pb)
# Keep track of which properties were projected when this instance
# was created. For db.Model instances, we use a monkey-patched init
# to keep track of these properties, which is done in
# protobuf_projection(), above.
if current_protobuf_projection and isinstance(entity, ndb.Model):
entity._protobuf_projection_properties = (
current_protobuf_projection.keys())
return entity
return _projected_pb_to_entity
_orig_db_pb_to_entity = datastore.DatastoreAdapter.pb_to_entity
def _protobuf_projection_mock_getattribute(cls):
"""Prevent access to properties excluded from a projection query.
Monkey-patch this class's getattribute to raise an exception when
trying to access an unavailable property on an instance that was created
within a protobuf_projection context.
This monkey-patch for this function is always in effect, but it only
applies to objects that were created within the protobuf_projection
context. Those are the only objects that will have a
_protobuf_projection_properties attribute available. The code inside
this function must execute extremely fast, because it is called A LOT.
"""
orig_get_attribute = cls.__getattribute__
projected_attribute_error_msg = ("Property unavailable due to protobuf "
"projection.")
projected_attribute_error = AttributeError(projected_attribute_error_msg)
def new_get_attribute(self, name):
try:
# Don't call hasattr(), since that'd be infinite recursion
projected_properties = super(cls, self).__getattribute__(
'_protobuf_projection_properties')
all_properties = super(cls, self).__getattribute__(
'_properties').keys()
if name in all_properties and name not in projected_properties:
raise projected_attribute_error
except AttributeError, ae:
if ae.message == projected_attribute_error_msg:
raise
else:
# This instances wasn't fetched during protobuf projection,
# since _protobuf_projection_properties is unavailable
pass
return orig_get_attribute(self, name)
cls.__getattribute__ = new_get_attribute
_protobuf_projection_disallowed_put_exception = IOError(
'Write disallowed due to protobuf projection.')
def _protobuf_projection_disallow_ndb_put():
# Monkey-patch the various put() functions to raise an exception when
# trying to put an entity that was created within a protobuf_projection
# context.
def do_not_allow_puts(self, orig_func, *args, **kwargs):
if hasattr(self, '_protobuf_projection_properties'):
raise _protobuf_projection_disallowed_put_exception
return orig_func(self, *args, **kwargs)
orig_put = ndb.Model.put
ndb.Model.put = lambda self, *args, **kwargs: do_not_allow_puts(self,
orig_put, *args, **kwargs)
orig_put_async = ndb.Model.put_async
ndb.Model.put_async = lambda self, *args, **kwargs: do_not_allow_puts(self,
orig_put_async, *args, **kwargs)
def _protobuf_projection_disallow_db_put():
orig_db_put_async = db.put_async
def new_db_put_async(models, **kwargs):
if not isinstance(models, (list, tuple)):
models = (models,)
if any([hasattr(m, '_protobuf_projection_properties')
for m in models]):
raise _protobuf_projection_disallowed_put_exception
return orig_db_put_async(models, **kwargs)
db.put_async = new_db_put_async
orig_model_put = db.Model.put
def new_db_model_put(self, *args, **kwargs):
if hasattr(self, '_protobuf_projection_properties'):
raise _protobuf_projection_disallowed_put_exception
return orig_model_put(self, *args, **kwargs)
db.Model.put = new_db_model_put
def enable_db_protobuf_projection():
_protobuf_projection_mock_getattribute(db.Model)
_protobuf_projection_disallow_db_put()
datastore.DatastoreAdapter.pb_to_entity = _create_projected_pb_to_entity(
_orig_db_pb_to_entity)
_orig_ndb_pb_to_entity = ndb_model.ModelAdapter.pb_to_entity
def enable_ndb_protobuf_projection():
_protobuf_projection_mock_getattribute(ndb.Model)
_protobuf_projection_disallow_ndb_put()
ndb_model.ModelAdapter.pb_to_entity = _create_projected_pb_to_entity(
_orig_ndb_pb_to_entity)
class ProjectedProtobufQueryTest(gae_model.GAEModelTestCase):
def setUp(self):
super(ProjectedProtobufQueryTest, self).setUp()
db_util.enable_db_protobuf_projection()
db_util.enable_ndb_protobuf_projection()
@db_util.disable_ndb_memcache
class TestNDBModel(ndb.Model):
prop_a = ndb.TextProperty(indexed=False)
prop_b = ndb.IntegerProperty(indexed=True, default=-1)
self.test_ndb_class = TestNDBModel
class TestDBModel(db.Model):
prop_a = db.TextProperty(indexed=False)
prop_b = db.IntegerProperty(indexed=True, default=-1)
self.test_db_class = TestDBModel
def test_ndb_simple(self):
obj = self.test_ndb_class(prop_a="Hello world", prop_b=3)
key = obj.put()
# Get by key
with db_util.protobuf_projection(self.test_ndb_class, ['prop_a']):
obj = key.get(use_cache=False)
self.assertEqual(obj.prop_a, "Hello world")
# Get a different projection by key
with db_util.protobuf_projection(self.test_ndb_class, ['prop_b']):
obj = key.get(use_cache=False)
self.assertEqual(obj.prop_b, 3)
# Get a different projection by key
with db_util.protobuf_projection(self.test_ndb_class,
['prop_a', 'prop_b']):
obj = key.get(use_cache=False)
self.assertEqual(obj.prop_a, "Hello world")
self.assertEqual(obj.prop_b, 3)
# Get by query
with db_util.protobuf_projection(self.test_ndb_class, ['prop_b']):
obj = self.test_ndb_class.query().filter(
self.test_ndb_class.prop_b == 3).get(use_cache=False)
self.assertEqual(obj.prop_b, 3)
def test_ndb_put_disallowed(self):
obj = self.test_ndb_class(prop_a="Hello world", prop_b=3)
key = obj.put()
def call_put():
obj.put()
def call_put_async():
obj.put_async()
def call_global_put():
ndb.put_multi([obj])
with db_util.protobuf_projection(self.test_ndb_class, ['prop_a']):
obj = key.get(use_cache=False)
# Calling put() on an instance created during protobuf projection
# should always raise an IOError
self.assertRaises(IOError, call_put)
self.assertRaises(IOError, call_put_async)
self.assertRaises(IOError, call_global_put)
# But, calling put on an instance created by a regular datastore
# read should put() correctly.
obj = key.get(use_cache=False)
try:
call_put()
call_put_async()
call_global_put()
except IOError:
self.fail('IOError raised incorrectly.')
def test_ndb_excluded_attribute_reading_disallowed(self):
obj = self.test_ndb_class(prop_a="Hello world", prop_b=3)
key = obj.put()
# Reading an excluded property from an instance created during
# projection should always raise an AttributeError
with db_util.protobuf_projection(self.test_ndb_class, ['prop_a']):
obj = key.get(use_cache=False)
self.assertRaises(AttributeError, lambda: obj.prop_b)
self.assertRaises(AttributeError, lambda: obj.prop_b)
# But reading an excluded property should be fine during regular
# datastore reads.
obj = key.get(use_cache=False)
try:
obj.prop_b
except AttributeError:
self.fail('AttributeError raised incorrectly.')
def test_db_simple(self):
obj = self.test_db_class(prop_a="Hello world", prop_b=3)
key = obj.put()
# Get by key
with db_util.protobuf_projection(self.test_db_class, ['prop_a']):
obj = db.get(key)
self.assertEqual(obj.prop_a, "Hello world")
# Get a different projection by key
with db_util.protobuf_projection(self.test_db_class, ['prop_b']):
obj = db.get(key)
self.assertEqual(obj.prop_b, 3)
# Get a different projection by key
with db_util.protobuf_projection(self.test_db_class,
['prop_a', 'prop_b']):
obj = db.get(key)
self.assertEqual(obj.prop_a, "Hello world")
self.assertEqual(obj.prop_b, 3)
# Get by query
with db_util.protobuf_projection(self.test_db_class, ['prop_b']):
obj = self.test_db_class.all().filter(
'prop_b =', 3).get()
self.assertEqual(obj.prop_b, 3)
def test_db_put_disallowed(self):
obj = self.test_db_class(prop_a="Hello world", prop_b=3)
key = obj.put()
def call_put():
obj.put()
def call_global_put():
db.put(obj)
db.put([obj])
with db_util.protobuf_projection(self.test_db_class, ['prop_a']):
obj = db.get(key)
# Calling put() on an instance created during protobuf projection
# should always raise an IOError
self.assertRaises(IOError, call_put)
self.assertRaises(IOError, call_global_put)
# But, calling put on an instance created by a regular datastore
# read should put() correctly.
obj = db.get(key)
try:
call_put()
call_global_put()
except IOError:
self.fail('IOError raised incorrectly.')
def test_db_excluded_attribute_reading_disallowed(self):
obj = self.test_db_class(prop_a="Hello world", prop_b=3)
key = obj.put()
# Reading an excluded property from an instance created during
# projection should always raise an AttributeError
with db_util.protobuf_projection(self.test_db_class, ['prop_a']):
obj = db.get(key)
self.assertRaises(AttributeError, lambda: obj.prop_b)
self.assertRaises(AttributeError, lambda: obj.prop_b)
# But reading an excluded property should be fine during regular
# datastore reads.
obj = db.get(key)
try:
obj.prop_b
except AttributeError:
self.fail('AttributeError raised incorrectly.')
projection = ['user_email', 'username', 'user_nickname', 'user_id',
'is_phantom', 'birthdate', 'may_be_child', 'restricted_domain',
'child_capabilities']
with db_util.protobuf_projection(user_models.UserData, projection):
students_data = user_models.UserData.all().fetch(50)
"""Utility functions to help write thread-safe code.
The main advantage of this code is that it knows whether the
dev-appserver is being run with 'threadsafe: true' or 'threadsafe:
false', and if it's false, it uses more efficient, but less
thread-safe, variants. (This 'knowing' is actually enforced in the
unittest.)
Therefore, don't use these methods if you are spawning threads
yourself; these are meant to be used for 'generic' GAE objects that do
not use threads unless we are using a threaded version of our webapp.
NOTE: This is imported by appengine_config.py, which is very
restricted in what it's safe to import. Only import python system
libs from this file.
"""
import os
import threading
import modules_util
_USES_THREADSAFE = modules_util.module_yaml(full_parse=False).get(
'threadsafe', False)
if _USES_THREADSAFE:
import UserDict
import threading
_THREAD_LOCAL_SUPER = threading.local
# Sadly, it's illegal to inherit from two C classes, and
# threading.local and dict are both written in C. So we have to
# use UserDict instead of dict to make ThreadLocalDict() work.
_DICT_SUPER = UserDict.UserDict
RLock = threading.RLock
else:
# No need for threadsafety if we don't see threadsafe: true in app.yaml.
_THREAD_LOCAL_SUPER = object
_DICT_SUPER = dict
# I have to do this context-manager the old-fashioned way since
# we do collections.defaultdict(threading.RLock) in cacheutil.py.
class RLock(object):
"""A no-op version of threading.RLock; can only be used with 'with'."""
def __enter__(self):
pass
def __exit__(self, *args):
pass
class ThreadLocal(_THREAD_LOCAL_SUPER):
"""Equivalent to threading.local.
You can use this in two ways: make an instance of it to get a
thread-local dummy-object, or subclass it to get a thread-local
smartie-object.
Usage:
class MyClass(thread_util.ThreadLocal):
...
my_instance = MyClass() # note this is at the global level
my_random_container = thread_util.ThreadLocal()
Now my_instance will have a different version of MyClass() in every
thread. my_random_container will also be different in every thread.
The reason to use this instead of threading.local directly is that
we revert to 'object' when dev-appserver is run in non-threaded mode,
so you don't pay the cost of thread-local-ness if you don't have to.
"""
pass
class ThreadLocalDict(_DICT_SUPER, _THREAD_LOCAL_SUPER):
"""A threadsafe dict: each thread has its own version of it."""
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment