Created
October 13, 2008 20:58
-
-
Save Arachnid/16601 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Caveats: | |
- Calling put on a fetched entity more than once has extra overhead. | |
- Putting entities that have been concurrently modified since they were fetched has extra overhead even outside transactions. | |
- Creating and putting an entity to overwrite one already in the datastore has extra overhead. | |
- Attempting to create and put an entity in a transaction when one already exists in the datastore will fail. | |
- Strings are sorted by a 'smart' caseless sort rather than by unicode codepoint. | |
- Custom indexes with multiple sort orders are not supported. | |
- Does not (currently) support merge joins, so no support for multiple equality filters with no composite index. | |
Bonuses: | |
- The standard indexes cover filters and sorting (ascending or descending) on any single property. | |
- Custom indexes can be scanned in reverse. | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from google.appengine.datastore import entity_pb | |
from google.appengine.datastore import datastore_index | |
from google.appengine.datastore import datastore_pb | |
from google.appengine.runtime import apiproxy_errors | |
import base64 | |
import collections | |
import couchdb | |
import itertools | |
import logging | |
import simplejson | |
import threading | |
import types | |
def PathToString(path): | |
return base64.urlsafe_b64encode(path.Encode()).replace('=', '') | |
def ReferenceValueToString(refval): | |
path = entity_pb.Path() | |
for elt in refval.pathelement_list(): | |
newelt = path.add_element() | |
newelt.set_type(elt.type()) | |
if elt.has_name(): | |
newelt.set_name(elt.name()) | |
else: | |
assert elt.has_id() | |
newelt.set_id(elt.id()) | |
return PathToString(path) | |
def StringToPath(s, path): | |
s = str(s) | |
modulo = len(s) % 4 | |
if modulo != 0: | |
s += ('=' * (4 - modulo)) | |
path.ParseFromString(base64.urlsafe_b64decode(s)) | |
def StringToReferenceValue(s, refval): | |
path = entity_pb.Path() | |
StringToPath(s, path) | |
for elt in path.element_list(): | |
newelt = refval.add_pathelement() | |
newelt.set_type(elt.type()) | |
if elt.has_name(): | |
newelt.set_name(elt.name()) | |
else: | |
assert elt.has_id() | |
newelt.set_id(elt.id()) | |
def PathToTuple(path): | |
return sum(((x.type(), x.name() or x.id()) for x in path.element_list()), ()) | |
def TupleToPath(tup, path): | |
for i in range(len(tup) / 2): | |
elt = path.add_element() | |
elt.set_type(tup[2*i]) | |
if isinstance(tup[2*i + 1], str): | |
elt.set_name(tup[2*i + 1]) | |
else: | |
elt.set_id(tup[2*i + 1]) | |
def GetPropertyValue(pb): | |
pbval = pb.value() | |
meaning = pb.meaning() if pb.has_meaning() else None | |
if pbval.has_stringvalue(): | |
value = pbval.stringvalue() | |
if meaning == entity_pb.Property.BLOB: | |
value = base64.b64encode(value) | |
else: | |
value = unicode(value.decode('utf-8')) | |
elif pbval.has_int64value(): | |
value = long(pbval.int64value()) | |
elif pbval.has_booleanvalue(): | |
value = bool(pbval.booleanvalue()) | |
elif pbval.has_doublevalue(): | |
value = pbval.doublevalue() | |
elif pbval.has_referencevalue(): | |
value = { | |
"t": "ref", | |
"ref": ReferenceValueToString(pbval.referencevalue()), | |
} | |
elif pbval.has_pointvalue(): | |
value = { | |
"t": "point", | |
"x": pbval.pointvalue().x(), | |
"y": pbval.pointvalue().y(), | |
} | |
elif pbval.has_uservalue(): | |
value = { | |
"t": "user", | |
"email": unicode(pbval.uservalue().email().decode('utf-8')), | |
"auth_domain": unicode(pbval.uservalue().auth_domain().decode('utf-8')), | |
} | |
else: | |
value = None | |
return (value, pb.meaning_uri() or meaning) | |
def GetPropertyValues(plist): | |
propertybufs = collections.defaultdict(list) | |
for p in plist: | |
propertybufs[p.name()].append(p) | |
properties = {} | |
for propname, propvals in propertybufs.iteritems(): | |
assert len(propvals) == 1 or propvals[0].multiple() | |
properties[propname] = sum((GetPropertyValue(x) for x in propvals), ()) | |
return properties | |
def EntityToDocument(entity, key): | |
properties = GetPropertyValues(entity.property_list()) | |
raw_properties = GetPropertyValues(entity.raw_property_list()) | |
if '_rev' in properties: | |
rev = properties['_rev'][0] | |
del properties['_rev'] | |
else: | |
rev = None | |
docid = PathToString(key.path()) | |
document = { | |
"_id": docid, | |
"path": PathToTuple(key.path()), | |
"type": key.path().element(key.path().element_size() - 1).type(), | |
"p": properties, | |
"r": raw_properties, | |
} | |
if (entity.has_entity_group() and | |
entity.entity_group().element_size() > 0 and | |
entity.entity_group().element_list()[0] != key.path().element_list()[0]): | |
document['group'] = PathToTuple(entity.entity_group()) | |
if rev: | |
document['_rev'] = rev | |
return document | |
PROPERTY_SETTERS = { | |
str: entity_pb.PropertyValue.set_stringvalue, | |
unicode: entity_pb.PropertyValue.set_stringvalue, | |
bool: entity_pb.PropertyValue.set_booleanvalue, | |
float: entity_pb.PropertyValue.set_doublevalue, | |
int: entity_pb.PropertyValue.set_int64value, | |
long: entity_pb.PropertyValue.set_int64value, | |
types.NoneType: lambda x,y: (x,y), | |
} | |
def SetPropertyValue(entity, name, value, meaning, multiple=False): | |
if meaning in (entity_pb.Property.TEXT, entity_pb.Property.BLOB): | |
prop = entity.add_raw_property() | |
else: | |
prop = entity.add_property() | |
prop.set_name(name) | |
if meaning != None: | |
prop.set_meaning(meaning) | |
prop.set_multiple(multiple) | |
propval = prop.mutable_value() | |
setter = PROPERTY_SETTERS.get(type(value), None) | |
if setter: | |
setter(propval, value) | |
elif isinstance(value, dict): | |
proptype = value['t'] | |
if proptype == "ref": | |
ref = prop.mutable_value().mutable_referencevalue() | |
ref.set_app(entity.key().app()) | |
StringToReferenceValue(value['ref'], ref) | |
elif proptype == "point": | |
pt = prop.mutable_value().mutable_pointvalue() | |
pt.set_x(value['x']) | |
pt.set_y(value['y']) | |
elif proptype == "user": | |
user = prop.mutable_value().mutable_uservalue() | |
user.set_email(value['email']) | |
user.set_auth_domain(value['auth_domain']) | |
elif isinstance(value, basestring) and meaning == entity_pb.Property.BLOB: | |
propval.set_stringvalue(base64.b64decode(value)) | |
def DocumentToEntity(document, entity): | |
# Set entity group | |
if 'group' in document: | |
TupleToPath(document['group'], entity.mutable_entity_group()) | |
else: | |
elt = entity.mutable_entity_group().add_element() | |
elt.CopyFrom(entity.key().path().element_list()[0]) | |
# Set _rev special property | |
prop = entity.add_property() | |
prop.set_name('_rev') | |
prop.mutable_value().set_stringvalue(document['_rev']) | |
prop.set_multiple(False) | |
# Set other properties | |
for k, v in document['p'].iteritems(): | |
if len(v) == 2: | |
SetPropertyValue(entity, k, v[0], v[1]) | |
else: | |
for i in range(0, len(v), 2): | |
SetPropertyValue(entity, k, v[i], v[i+1], True) | |
KIND = 1 | |
ANCESTOR = 2 | |
def ParseQuery(query): | |
idx = IndexSpec() | |
idx.type = query.kind() | |
assert idx.type, "All queries must specify a kind." | |
ancestor = query.has_ancestor() | |
filters = query.filter_list() | |
orders = query.order_list() | |
for filter in filters: | |
assert filter.op() != datastore_pb.Query_Filter.IN, 'Filter.op()==IN' | |
nprops = len(filter.property_list()) | |
assert nprops == 1, 'Filter has %s properties, expected 1' % nprops | |
assert (not ancestor or kind or filters | |
or orders), "Ancestor-only queries not supported" | |
if ancestor: | |
idx.ancestor = PathToString(query.ancestor().path()) | |
eq_filters = [f for f in filters | |
if f.op() in datastore_index.EQUALITY_OPERATORS] | |
ineq_filters = [f for f in filters | |
if f.op() in datastore_index.INEQUALITY_OPERATORS] | |
assert (len(eq_filters) + len(ineq_filters) | |
== len(filters)), 'Not all filters used' | |
for f in eq_filters: | |
prop = f.property(0) | |
propname = prop.name() | |
propval = GetPropertyValue(prop)[0] | |
idx.eq_fields.append(propname) | |
idx.eq_values[propname].append(propval) | |
idx.eq_fields.sort() | |
orders = [x for x in orders if x.property() not in idx.eq_fields] | |
ineq_property = None | |
ineq_min = None | |
ineq_max = None | |
if ineq_filters: | |
ineq_property = ineq_filters[0].property(0).name() | |
for filter in ineq_filters: | |
assert filter.property(0).name() == ineq_property | |
op = (GetPropertyValue(filter.property(0))[0], filter.op()) | |
if op[1] in (filter.LESS_THAN, filter.LESS_THAN_OR_EQUAL): | |
ineq_max = min(ineq_max, op) if ineq_max else op | |
elif op[1] in (filter.GREATER_THAN, filter.GREATER_THAN_OR_EQUAL): | |
ineq_min = max(ineq_min, op) if ineq_min else op | |
if ineq_property: | |
if orders: | |
assert ineq_property == orders[0].property() | |
# First order is satisfied by inequality | |
else: | |
idx.range_fields.append((ineq_property, datastore_index.ASCENDING)) | |
if ineq_min: | |
idx.idx_min.append(ineq_min[0]) | |
if ineq_min[1] == datastore_pb.Query_Filter.GREATER_THAN: | |
# First key after this one | |
idx.idx_min.append(None) | |
if ineq_max: | |
if ineq_max[1] == datastore_pb.Query_Filter.LESS_THAN: | |
idx.exclusive_max = True | |
idx.idx_max.append(ineq_max[0]) | |
else: | |
# Last key with this prefix | |
idx.idx_max.append({}) | |
for order in orders: | |
idx.range_fields.append((order.property(), order.direction())) | |
return idx | |
class IdSequence(object): | |
BATCH_SIZE = 10 | |
def __init__(self, db, kind): | |
self.db = db | |
self._id = "_%s_seq" % (kind,) | |
self._seqobj = None | |
self._next = None | |
def next(self): | |
if not self._seqobj: | |
try: | |
self._seqobj = self.db[self._id] | |
except couchdb.client.ResourceNotFound: | |
self._seqobj = { 'next': 1 } | |
if self._next is None or self._next == self._seqobj['next']: | |
while True: | |
self._next = self._seqobj['next'] | |
self._seqobj['next'] += self.BATCH_SIZE | |
try: | |
self.db[self._id] = self._seqobj | |
break | |
except couchdb.client.PreconditionFailed: | |
self._seqobj = self.db[self._id] | |
ret = self._next | |
self._next += 1 | |
return ret | |
class QueryPlan(object): | |
def __init__(self, db, view_name, exclusive_max=False, batch_size=20, **args): | |
self.db = db | |
self.view_name = view_name | |
self.args = args | |
self.exclusive_max = False | |
self.batch_size = batch_size | |
self._results = [] | |
self._finished = False | |
self._count = None | |
self._num_fetched = 0 | |
self._keys_returned = set() | |
def _BuildResults(self, ents): | |
# TODO(nickjohnson): Improve this to use include_docs and/or multi-fetch | |
# once they become available. | |
for ent in ents: | |
if self.exclusive_max and ent.key == self.args['endkey']: | |
self._finished = True | |
return | |
doc = self.db[ent.id] | |
yield doc | |
def _GetMore(self, count): | |
if count < self.batch_size: | |
count = self.batch_size | |
self.args['count'] = count | |
result = self.db.view(self.view_name, **self.args) | |
self._count = result.total_rows | |
self._num_fetched += len(result.rows) | |
for row in result.rows: | |
rowid = row.id | |
if rowid in self._keys_returned: | |
break | |
self._keys_returned.add(rowid) | |
self._results.append(row) | |
if len(result.rows) < count or self._num_fetched == self._count: | |
self._finished = True | |
elif count > 0: | |
self.args['startkey_docid'] = result.rows[-1].id | |
self.args['skip'] = 1 | |
def Next(self, count): | |
if len(self._results) < count and not self._finished: | |
self._GetMore(count - len(self._results)) | |
ret = self._BuildResults(self._results[:count]) | |
del self._results[:count] | |
return ret | |
def HasMore(self): | |
if self._count is None: | |
self._GetMore(0) | |
return self._finished or self._count == self._num_fetched | |
def Count(self): | |
if self._count == None: | |
self._GetMore(0) | |
return self._count | |
class IndexSpec(object): | |
def __init__(self): | |
# Field names used in equality tests | |
self.eq_fields = [] | |
# Values for equality fields | |
self.eq_values = collections.defaultdict(list) | |
# Field name, direction tuples used in range or order queries | |
self.range_fields = [] | |
# Upper and lower bounds for non-equality fields | |
self.idx_min = [] | |
self.idx_max = [] | |
# Is the upper limit exclusive? | |
self.exclusive_max = False | |
# Type name | |
self.type = None | |
# Ancestor key | |
self.ancestor = None | |
def ConstructPlan(self, db, query, idx_name, idx_def): | |
"""Attempts to construct a QueryPlan. | |
Args: | |
db: The database to execute the plan against. | |
query: The query. | |
idx_name: The fully qualified name of the view this index is realised in. | |
idx_def: A tuple specifying the index to attempt to execute this | |
plan against. | |
Returns: | |
A QueryPlan object, or None if the index is not suitable. | |
""" | |
key_base = [] | |
if self.type != idx_def[0]: | |
# Not the right kind | |
return None | |
if (self.ancestor == None) == idx_def[1]: | |
# One specifies ancestor and the other doesn't. | |
return None | |
elif self.ancestor: | |
key_base.append(self.ancestor) | |
if len(idx_def) < len(self.eq_fields) * 2 + 2: | |
# Not enough index elements to satisfy the equality constraints | |
return None | |
idx_eq_fields = [idx_def[i*2+2] for i in range(len(self.eq_fields))] | |
if sorted(idx_eq_fields) != self.eq_fields: | |
# First n fields of index do not match equality fields | |
return None | |
else: | |
# Add the equality filters to the key base in index order | |
eq_values = dict((x, list(y)) for x,y in self.eq_values.iteritems()) | |
for fld in idx_eq_fields: | |
key_base.append(eq_values[fld].pop()) | |
# True=forward, False=reversed, None=either | |
scan_order = None | |
idx_ineq_fields = idx_def[len(self.eq_fields) * 2 + 2:] | |
if len(self.range_fields) * 2 != len(idx_ineq_fields): | |
return None | |
for i, fld in enumerate(self.range_fields): | |
if fld[0] != idx_ineq_fields[i*2]: | |
# Different inequality field | |
return None | |
dir_cmp = (fld[1] == idx_ineq_fields[i*2+1]) | |
if scan_order is None: | |
scan_order = dir_cmp | |
elif dir_cmp != scan_order: | |
# Directions do not match | |
return None | |
if scan_order is None: | |
scan_order = True | |
if scan_order: | |
startkey = key_base + self.idx_min | |
endkey = key_base + self.idx_max | |
else: | |
endkey = key_base + self.idx_min | |
startkey = key_base + self.idx_max | |
return QueryPlan(db, idx_name, self.exclusive_max, startkey=startkey, | |
endkey=endkey, descending=not scan_order, | |
skip=query.offset()) | |
class DatastoreCouchDBStub(object): | |
""" Datastore stub implementation that uses a couchdb instance.""" | |
MAX_PUT_ATTEMPTS = 5 # Most we will retry a put operation due to conflicts. | |
STANDARD_INDEXES_DOC = { | |
'language': 'javascript', | |
'views': { | |
'Entities': { | |
'map': """function(doc) { | |
emit(doc.type, null); | |
}""", | |
}, | |
'EntitiesByProperty': { | |
'map': """function (doc) { | |
properties = doc.p; | |
for(var p in properties) { | |
vals = properties[p] | |
for(var i = 0; i < vals.length; i+= 2) | |
emit([doc.type, p, vals[i]], null); | |
} | |
}""", | |
}, | |
'Schema': { | |
'map': """function (doc) { | |
emit(doc.path[doc.path.length - 2], doc.p); | |
}""", | |
'reduce': """function(keys, values) { | |
properties = {}; | |
for(var i = 0; i < values.length; i++) | |
for(var p in values[i]) | |
properties[p] = values[i][p] | |
return properties; | |
}""", | |
} | |
} | |
} | |
COMPOSITE_INDEXES_TEMPLATE = { | |
'language': 'javascript', | |
'views': {}, | |
'indexes': [], | |
'next_index': 1, | |
} | |
def __init__(self, server): | |
self._server = server | |
self._known_dbs = set() | |
self._sequences = {} | |
self._sequence_lock = threading.Lock() | |
self._transaction_documents = {} | |
self._next_transaction_id = 1 | |
self._transaction_lock = threading.Lock() | |
self._cursors = {} | |
self._next_cursor_id = 1 | |
self._cursor_lock = threading.Lock() | |
self._composite_indexes = {} | |
def MakeSyncCall(self, service, call, request, response): | |
assert service == 'datastore_v3' | |
explanation = [] | |
assert request.IsInitialized(explanation), explanation | |
func = (getattr(self, "_Dynamic_" + call)) | |
if not func: | |
apiproxy_errors.CallNotFoundError() | |
func(request, response) | |
assert response.IsInitialized(explanation), explanation | |
def _CreateDb(self, appid): | |
self._server.create(appid) | |
db = self._server[appid] | |
db['_design/standard_indexes'] = self.STANDARD_INDEXES_DOC | |
db['_design/composite_indexes'] = self.COMPOSITE_INDEXES_TEMPLATE | |
def _GetDb(self, appid): | |
db = self._server[appid] | |
if appid not in self._known_dbs: | |
try: | |
db.info() | |
except couchdb.client.ResourceNotFound: | |
self._CreateDb(appid) | |
self._known_dbs.add(appid) | |
return db | |
def _GetNextId(self, appid, typename): | |
self._sequence_lock.acquire() | |
try: | |
key = (appid, typename) | |
sequence = self._sequences.get(key, None) | |
if not sequence: | |
sequence = IdSequence(self._GetDb(appid), typename) | |
self._sequences[key] = sequence | |
return sequence.next() | |
finally: | |
self._sequence_lock.release() | |
def _UpdateRevisions(self, docs, db): | |
for doc in docs: | |
doc['_rev'] = db[doc['_id']]['_rev'] | |
def _WriteDatastore(self, docs, db, transactional=False): | |
for i in range(self.MAX_PUT_ATTEMPTS): | |
try: | |
db.update(docs) | |
return | |
except couchdb.client.PreconditionFailed: | |
if transactional: | |
raise | |
self._UpdateRevisions(docs, db) | |
else: | |
# Max attempts expired | |
if transactional: | |
raise apiproxy_errors.ApplicationError( | |
datastore_pb.Error.CONCURRENT_TRANSACTION) | |
else: | |
raise apiproxy_errors.ApplicationError(datastore_pb.Error.TIMEOUT) | |
def _UpdateIndexes(self, app, doc): | |
indexes = collections.defaultdict(list) | |
# Group indexes by entity type | |
for idx in doc['indexes']: | |
indexes[idx['def'][0]].append(idx) | |
self._composite_indexes[app] = indexes | |
def _GetIndexDefinitions(self, app): | |
"""Returns a list of composite index definitions.""" | |
if app not in self._composite_indexes: | |
db = self._GetDb(app) | |
self._UpdateIndexes(app, db['_design/composite_indexes']) | |
return self._composite_indexes[app] | |
def _AddIndex(self, app, db, idx_def, map_func): | |
"""Adds a new composite index. | |
args: | |
idx_def: The tuple describing this index. | |
map_func: The javascript map function that generates this index. | |
""" | |
for i in range(self.MAX_PUT_ATTEMPTS): | |
try: | |
doc = db['_design/composite_indexes'] | |
idx_id = doc['next_index'] | |
doc['next_index'] += 1 | |
doc['views'][str(idx_id)] = { 'map': map_func } | |
doc['indexes'].append({'id': idx_id, 'def': idx_def}) | |
db['_design/composite_indexes'] = doc | |
self._UpdateIndexes(app, doc) | |
return idx_id | |
except couchdb.client.PreconditionFailed: | |
pass | |
else: | |
# Max retries reached | |
raise apiproxy_errors.ApplicationError(datastore_pb.Error.TIMEOUT) | |
def _DeleteIndex(self, idx_id): | |
"""Deletes a composite index.""" | |
pass | |
def _Dynamic_Put(self, put_request, put_response): | |
entities = put_request.entity_list() | |
app = entities[0].key().app() | |
assert all(x.key().app() == app for x in entities) | |
keys = [] | |
documents = [] | |
for entity in put_request.entity_list(): | |
assert entity.has_key() | |
assert entity.key().path().element_size() > 0 | |
key = entity_pb.Reference() | |
key.CopyFrom(entity.key()) | |
keys.append(key) | |
last_path = key.path().element_list()[-1] | |
if last_path.id() == 0 and not last_path.has_name(): | |
last_path.set_id(self._GetNextId(app, last_path.type())) | |
assert entity.entity_group().element_size() == 0 | |
else: | |
assert (entity.has_entity_group() and | |
entity.entity_group().element_size() > 0) | |
documents.append(EntityToDocument(entity, key)) | |
if put_request.has_transaction(): | |
txid = put_request.transaction().handle() | |
self._transaction_documents[txid][app].extend(documents) | |
else: | |
self._WriteDatastore(documents, self._GetDb(app)) | |
put_response.key_list().extend(keys) | |
def _Dynamic_Get(self, get_request, get_response): | |
for key in get_request.key_list(): | |
db = self._GetDb(key.app()) | |
document = db[PathToString(key.path())] | |
entity = get_response.add_entity().mutable_entity() | |
entity.mutable_key().CopyFrom(key) | |
DocumentToEntity(document, entity) | |
def _Dynamic_Delete(self, delete_request, delete_response): | |
keys = delete_request.key_list() | |
app = keys[0].app() | |
assert all(x.app() == app for x in keys) | |
db = self._GetDb(app) | |
documents = [] | |
for key in delete_request.key_list(): | |
documents.append({'_id': PathToString(key.path()), '_deleted': True}) | |
# Fetch revision IDs for all the docs we're deleting | |
self._UpdateRevisions(documents, db) | |
if delete_request.has_transaction(): | |
txid = delete_request.transaction().handle() | |
self._transaction_documents[txid][app].extend(documents) | |
else: | |
self._WriteDatastore(documents, db) | |
def _CustomIndexPlanner(self, db, query, idxspec): | |
"""Satisfies queries that require a custom index.""" | |
indexes = self._GetIndexDefinitions(query.app()).get(idxspec.type, []) | |
for idx in indexes: | |
plan = idxspec.ConstructPlan(db, query, 'composite_indexes/%d' | |
% (idx['id']), idx['def']) | |
if plan: | |
return plan | |
raise apiproxy_errors.ApplicationError( | |
datastore_pb.Error.NEED_INDEX, | |
"This query requires a composite index that is not defined. " | |
"You must update the index.yaml file in your application root.") | |
def _SingleEqualityIndexPlanner(self, db, query, idxspec): | |
"""Satisfies queries that specify a kind and a single equality.""" | |
fld = list(idxspec.eq_fields)[0] | |
startkey = [idxspec.type, fld, idxspec.eq_values[fld][0]] | |
endkey = startkey + [{}] | |
return QueryPlan(db, 'standard_indexes/EntitiesByProperty', | |
idxspec.exclusive_max, startkey=startkey, endkey=endkey, | |
skip=query.offset()) | |
def _SingleInequalityIndexPlanner(self, db, query, idxspec): | |
fld, desc = idxspec.range_fields[0] | |
startkey = [idxspec.type, fld] + idxspec.idx_min | |
endkey = [idxspec.type, fld] + idxspec.idx_max | |
if desc == datastore_index.DESCENDING: | |
return QueryPlan(db, 'standard_indexes/EntitiesByProperty', | |
idxspec.exclusive_max, startkey=endkey, endkey=startkey, | |
descending=True, skip=query.offset()) | |
else: | |
return QueryPlan(db, 'standard_indexes/EntitiesByProperty', | |
idxspec.exclusive_max, startkey=startkey, endkey=endkey, | |
descending=False, skip=query.offset()) | |
def _KindIndexPlanner(self, db, query, idxspec): | |
"""Satisfies queries that specify only an entity.""" | |
return QueryPlan(db, 'standard_indexes/Entities', idxspec.exclusive_max, | |
key=idxspec.type, skip=query.offset()) | |
def _GetQueryPlan(self, query): | |
app = query.app() | |
db = self._GetDb(app) | |
idxspec = ParseQuery(query) | |
if not idxspec.eq_fields and not idxspec.range_fields: | |
return self._KindIndexPlanner(db, query, idxspec) | |
elif len(idxspec.eq_fields) == 1 and not idxspec.range_fields: | |
return self._SingleEqualityIndexPlanner(db, query, idxspec) | |
elif not idxspec.eq_fields and len(idxspec.range_fields) == 1: | |
return self._SingleInequalityIndexPlanner(db, query, idxspec) | |
else: | |
return self._CustomIndexPlanner(db, query, idxspec) | |
def _Dynamic_RunQuery(self, query, query_result): | |
plan = self._GetQueryPlan(query) | |
self._cursor_lock.acquire() | |
try: | |
self._cursors[self._next_cursor_id] = (query.app(), plan) | |
query_result.mutable_cursor().set_cursor(self._next_cursor_id) | |
query_result.set_more_results(plan.HasMore()) | |
self._next_cursor_id += 1 | |
finally: | |
self._cursor_lock.release() | |
def _Dynamic_Next(self, next_request, query_result): | |
cursor_id = next_request.cursor().cursor() | |
app, q = self._cursors.get(cursor_id, None) | |
if not q: | |
query_result.set_more_results(False) | |
return | |
for result_doc in q.Next(next_request.count()): | |
result = query_result.add_result() | |
key = result.mutable_key() | |
key.set_app(app) | |
path = key.mutable_path() | |
TupleToPath(result_doc['path'], path) | |
DocumentToEntity(result_doc, result) | |
query_result.set_more_results(q.HasMore()) | |
def _Dynamic_Count(self, query, integer64proto): | |
q = self._GetQueryPlan(query) | |
if not q: | |
integer64proto.set_value(0) | |
else: | |
integer64proto.set_value(q.Count()) | |
def _Dynamic_BeginTransaction(self, request, transaction): | |
self._transaction_lock.acquire() | |
try: | |
transaction.set_handle(self._next_transaction_id) | |
txdocs = collections.defaultdict(list) | |
self._transaction_documents[self._next_transaction_id] = txdocs | |
self._next_transaction_id += 1 | |
finally: | |
self._transaction_lock.release() | |
def _Dynamic_Commit(self, transaction, transaction_response): | |
txid = transaction.handle() | |
try: | |
for app, docs in self._transaction_documents[txid].iteritems(): | |
db = self._GetDb(app) | |
self._WriteDatastore(docs, db, True) | |
except couchdb.client.PreconditionFailed: | |
raise apiproxy_errors.ApplicationError( | |
datastore_pb.Error.CONCURRENT_TRANSACTION) | |
finally: | |
del self._transaction_documents[txid] | |
def _Dynamic_Rollback(self, transaction, transaction_response): | |
txid = transaction.handle() | |
if txid in self._transaction_documents: | |
del self._transaction_documents[txid] | |
def _Dynamic_GetSchema(self, app_str, schema): | |
pass | |
def _GenerateJsMapFunc(self, idx_def): | |
map_func = ("function(doc) {" | |
"key=Array();" | |
"idx=Array();" | |
"if(doc.type == '%s') {" | |
"%%s" | |
"}" | |
"}" % (idx_def[0],)) | |
if idx_def[1]: | |
# Ancestor | |
map_func %= ("for(var aidx = 0; aidx < len(doc.path) - 2; aidx += 2) {" | |
"key.push(doc.path.slice(0, aidx));" | |
"%s" | |
"key.pop();" | |
"}") | |
prev_indexes = {} | |
for i in range(len(idx_def) / 2 - 1): | |
propname = idx_def[i*2 + 2] | |
if propname in prev_indexes: | |
start = "idx[%d] + 2" % (prev_indexes[propname]) | |
else: | |
start = "0" | |
prev_indexes[propname] = i | |
map_func %= ("for(idx[%(num)d] = %(start)s; " | |
"idx[%(num)d] < doc.%(prop)s.length; idx[%(num)d] += 2) {" | |
"key.push(doc.%(prop)s[idx[%(num)d]]);" | |
"%%s" | |
"key.pop();" | |
"}" % {"num": i, "prop": propname, "start": start}) | |
map_func %= "emit(key, null);" | |
return map_func | |
def _Dynamic_CreateIndex(self, index, id_response): | |
app = index.app_id() | |
idx = index.definition() | |
idx_props = idx.property_list() | |
assert (all(idx_props[0].direction() == x.direction() for x in idx_props), | |
"Indexes with multiple sort orders not yet supported.") | |
idx_def = ((idx.entity_type(), idx.ancestor()) + | |
sum(((x.name(), x.direction()) for x in idx_props), ())) | |
map_func = self._GenerateJsMapFunc(idx_def) | |
idx_id = self._AddIndex(app, self._GetDb(app), idx_def, map_func) | |
id_response.set_value(idx_id) | |
def _Dynamic_GetIndices(self, app_str, composite_indices): | |
pass | |
def _Dynamic_UpdateIndex(self, index, void): | |
pass | |
def _Dynamic_DeleteIndex(self, index, void): | |
pass | |
def _EndRequest(self): | |
"""Frees per-request resources.""" | |
if self._transaction_documents: | |
logging.warn("Cleaning up %d forgotten transactions." | |
% (len(self._transaction_documents))) | |
self._transaction_documents = {} | |
self._cursors = {} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cStringIO | |
import couchdb | |
import datetime | |
import os | |
import mocker | |
import unittest | |
from google.appengine.api import apiproxy_stub_map | |
from google.appengine.api import datastore_admin | |
from google.appengine.api import users | |
from google.appengine.datastore import datastore_index | |
from google.appengine.ext import db | |
import datastore_couchdb_stub | |
class TestModel(db.Model): | |
a_string = db.StringProperty() | |
an_integer = db.IntegerProperty() | |
class FullModel(db.Model): | |
a_string = db.StringProperty(default="foo") | |
a_bool = db.BooleanProperty(default=True) | |
an_integer = db.IntegerProperty(default=42) | |
a_float = db.FloatProperty(default=1.999) | |
a_datetime = db.DateTimeProperty(default=datetime.datetime(2008, 10, 01)) | |
a_list = db.ListProperty(int, default=[1,2,3]) | |
a_reference = db.SelfReferenceProperty() | |
a_user = db.UserProperty() | |
a_blob = db.BlobProperty(default="Blobby!") | |
some_text = db.TextProperty(default="Lorem Ipsum Dolor Si Amet") | |
null_property = db.IntegerProperty() | |
class MockViewRow(object): | |
def __init__(self, id, key, value): | |
self.id = id | |
self.key = key | |
self.value = value | |
class TestDatastore(mocker.MockerTestCase): | |
def setUp(self): | |
apiproxy_stub_map.apiproxy = apiproxy_stub_map.APIProxyStubMap() | |
self.mock_server = self.mocker.mock(couchdb.client.Server) | |
stub = datastore_couchdb_stub.DatastoreCouchDBStub(self.mock_server) | |
apiproxy_stub_map.apiproxy.RegisterStub('datastore_v3', stub) | |
self.mock_db = self.mocker.mock(couchdb.client.Database) | |
self.mock_server["test"] | |
self.mocker.count(1, None) | |
self.mocker.result(self.mock_db) | |
self.mock_db.info() | |
os.environ['APPLICATION_ID'] = "test" | |
os.environ['AUTH_DOMAIN'] = "mydomain.com" | |
self.test_entity = { | |
'path': ('TestModel', 'foo'), | |
'type': 'TestModel', | |
'_id': 'CxIJVGVzdE1vZGVsIgNmb28M', | |
'p': { | |
'an_integer': (42L, None), | |
'a_string': ('foo', None), | |
}, | |
'r': {}, | |
} | |
self.result_keys = [ | |
(1, 'agR0ZXN0cg8LEglUZXN0TW9kZWwYAgw'), | |
(2, 'agR0ZXN0cg8LEglUZXN0TW9kZWwYAww'), | |
(3, 'agR0ZXN0cg8LEglUZXN0TW9kZWwYBAw'), | |
(4, 'agR0ZXN0cg8LEglUZXN0TW9kZWwYBQw'), | |
(5, 'agR0ZXN0cg8LEglUZXN0TW9kZWwYBgw'), | |
(6, 'agR0ZXN0cg8LEglUZXN0TW9kZWwYAQw'), | |
(7, 'agR0ZXN0cg8LEglUZXN0TW9kZWwYBww'), | |
] | |
def testPut(self): | |
m = self.mocker | |
mock_server = self.mock_server | |
mock_db = self.mock_db | |
# Expect update to be called with an entity | |
mock_db.update([self.test_entity]) | |
# Expect to be queried for a sequence | |
datastore_couchdb_stub.IdSequence.BATCH_SIZE = 1 | |
mock_db['_TestModel_seq'] | |
m.throw(couchdb.client.ResourceNotFound()) | |
mock_db['_TestModel_seq'] = {'next':2} | |
mock_db.update([ | |
{ | |
'path': ('TestModel', 1), | |
'type': 'TestModel', | |
'_id': 'CxIJVGVzdE1vZGVsGAEM', | |
'p': { | |
'an_integer': (24L, None), | |
'a_string': ('oof', None), | |
}, | |
'r': {}, | |
}, | |
]) | |
# Simulate contention on the sequence counter | |
mock_db['_TestModel_seq'] = {'next':3} | |
m.throw(couchdb.client.PreconditionFailed()) | |
mock_db['_TestModel_seq'] | |
m.result({'_id':'_TestModel_seq','_rev':'12345','next':5}) | |
mock_db['_TestModel_seq'] = {'_id':'_TestModel_seq','_rev':'12345','next':6} | |
mock_db.update([ | |
{ | |
'path': ('TestModel', 1, 'TestModel', 5), | |
'type': 'TestModel', | |
'_id': 'CxIJVGVzdE1vZGVsGAEMCxIJVGVzdE1vZGVsGAUM', | |
'p': { | |
'an_integer': (25L, None), | |
'a_string': ('oof', None), | |
}, | |
'r': {}, | |
}, | |
]) | |
m.replay() | |
# Test a model with a name provided | |
mymodel = TestModel(key_name="foo") | |
mymodel.a_string = "foo" | |
mymodel.an_integer = 42 | |
mymodel.put() | |
# Test a model with a generated ID | |
mymodel = TestModel() | |
mymodel.a_string = "oof" | |
mymodel.an_integer = 24 | |
mymodel.put() | |
# Test a model with another generated ID | |
mymodel = TestModel(parent=mymodel) | |
mymodel.a_string = "oof" | |
mymodel.an_integer = 25 | |
mymodel.put() | |
def testFullPut(self): | |
m = self.mocker | |
mock_server = self.mock_server | |
mock_db = self.mock_db | |
# Expect update to be called with an entity | |
mock_db.update([ | |
{ | |
'path': ('FullModel', 'test'), | |
'type': 'FullModel', | |
'_id': 'CxIJRnVsbE1vZGVsIgR0ZXN0DA', | |
'p': { | |
'a_list': (1L, None, 2L, None, 3L, None), | |
'a_user': ( | |
{ | |
't': 'user', | |
'auth_domain': 'mydomain.com', | |
'email': 'foo@bar.com' | |
}, None), | |
'a_datetime': (1222819200000000L, 7), | |
'an_integer': (42L, None), | |
'a_string': ('foo', None), | |
'a_bool': (True, None), | |
'null_property': (None, None), | |
'a_reference': ({ | |
'ref': 'CxIJRnVsbE1vZGVsIgR0ZXN0DA', | |
't': 'ref'}, None), | |
'a_float': (1.9990000000000001, None) | |
}, | |
'r': { | |
'some_text': ('Lorem Ipsum Dolor Si Amet', 15), | |
'a_blob': ('QmxvYmJ5IQ==', 14), | |
}, | |
} | |
]) | |
m.replay() | |
mymodel = FullModel(key_name="test") | |
mymodel.a_user = users.User("foo@bar.com") | |
mymodel.a_reference = db.Key.from_path('FullModel', 'test') | |
mymodel.put() | |
def testConflicts(self): | |
m = self.mocker | |
mock_server = self.mock_server | |
mock_db = self.mock_db | |
# Expect update to be called with an entity | |
mock_db.update([self.test_entity]) | |
m.throw(couchdb.client.PreconditionFailed()) | |
# Expect to be queried for a new revision ID | |
mock_db['CxIJVGVzdE1vZGVsIgNmb28M'] | |
m.result({'_id':'CxIJVGVzdE1vZGVsIgNmb28M', '_rev': '12345'}) | |
# Expect another update with the new revision ID | |
new_test_entity = dict(self.test_entity) | |
new_test_entity['_rev'] = '12345' | |
mock_db.update([new_test_entity]) | |
m.replay() | |
mymodel = TestModel(key_name="foo") | |
mymodel.a_string = "foo" | |
mymodel.an_integer = 42 | |
mymodel.put() | |
def testCreateDB(self): | |
m = self.mocker | |
mock_server = self.mock_server | |
mock_db = self.mock_db | |
m.throw(couchdb.client.ResourceNotFound()) | |
mock_server.create("test") | |
# Expect a design doc to be created | |
mock_db["_design/standard_indexes"] = \ | |
datastore_couchdb_stub.DatastoreCouchDBStub.STANDARD_INDEXES_DOC | |
mock_db["_design/composite_indexes"] = \ | |
datastore_couchdb_stub.DatastoreCouchDBStub.COMPOSITE_INDEXES_TEMPLATE | |
# Expect update to be called with an entity | |
mock_db.update([self.test_entity]) | |
m.replay() | |
mymodel = TestModel(key_name="foo") | |
mymodel.a_string = "foo" | |
mymodel.an_integer = 42 | |
mymodel.put() | |
def testGet(self): | |
m = self.mocker | |
mock_server = self.mock_server | |
mock_db = self.mock_db | |
# Expect update to be called with an entity | |
mock_db.update([self.test_entity]) | |
# Expect an entity to be fetched | |
mock_db[self.test_entity['_id']] | |
new_test_entity = dict(self.test_entity) | |
new_test_entity['_rev'] = '12345' | |
m.result(new_test_entity) | |
m.replay() | |
mymodel = TestModel(key_name="foo") | |
mymodel.a_string = "foo" | |
mymodel.an_integer = 42 | |
mymodel.put() | |
mymodel2 = TestModel.get_by_key_name("foo") | |
self.failUnlessEqual(mymodel.a_string, mymodel2.a_string) | |
self.failUnlessEqual(mymodel.an_integer, mymodel2.an_integer) | |
def testDelete(self): | |
m = self.mocker | |
mock_server = self.mock_server | |
mock_db = self.mock_db | |
# Expect a fetch | |
mock_db[self.test_entity['_id']] | |
new_test_entity = dict(self.test_entity) | |
new_test_entity['_rev'] = '12345' | |
m.result(new_test_entity) | |
m.count(2, 2) | |
# Expect a delete | |
mock_db.update([{ | |
'_id': new_test_entity['_id'], | |
'_rev': new_test_entity['_rev'], | |
'_deleted': True, | |
}]) | |
m.replay() | |
mymodel = TestModel.get_by_key_name('foo') | |
mymodel.delete() | |
def mockQueryResponse(self, mock_db, keys): | |
m = self.mocker | |
result = m.mock(couchdb.client.ViewResults) | |
m.result(result) | |
result.total_rows | |
m.result(len(keys)) | |
result.rows | |
m.result([MockViewRow(x[1], 'TestModel', None) for x in keys]) | |
m.count(1, None) | |
for i, key in keys: | |
mock_db[key] | |
m.result({ | |
'_id': key, | |
'_rev': '12345', | |
'path': ['TestModel', i], | |
'type': 'TestModel', | |
'r': {}, | |
'p': { | |
'an_integer': [i, None], | |
'a_string': ['String number %d' % (i,), None] | |
} | |
}) | |
def testQuery(self): | |
m = self.mocker | |
mock_server = self.mock_server | |
mock_db = self.mock_db | |
mock_db.view('standard_indexes/Entities', count=20, key='TestModel', skip=0) | |
result = m.mock(couchdb.client.ViewResults) | |
m.result(result) | |
self.mockQueryResponse(mock_db, self.result_keys[:5]) | |
m.replay() | |
# Test a simple kind-only query. | |
for i, tm in enumerate(TestModel.all().fetch(20)): | |
self.failUnlessEqual(tm.an_integer, i + 1) | |
self.failUnlessEqual(tm.a_string, 'String number %d' % (i + 1,)) | |
def testPropertyIndexQueries(self): | |
m = self.mocker | |
mock_server = self.mock_server | |
mock_db = self.mock_db | |
filters = [ | |
("an_integer >=", 5, self.result_keys[:-3]), | |
("an_integer >", 5, self.result_keys[:-2]), | |
("an_integer <=", 3, self.result_keys[:3]), | |
] | |
# an_integer >= 5 | |
mock_db.view('standard_indexes/EntitiesByProperty', count=20, | |
startkey=['TestModel', 'an_integer', 5L], | |
endkey=['TestModel', 'an_integer', {}], skip=0, | |
descending=False) | |
result = m.mock(couchdb.client.ViewResults) | |
m.result(result) | |
self.mockQueryResponse(mock_db, filters[0][2]) | |
# an_integer > 5 | |
mock_db.view('standard_indexes/EntitiesByProperty', count=20, | |
startkey=['TestModel', 'an_integer', 5L, None], | |
endkey=['TestModel', 'an_integer', {}], skip=0, | |
descending=False) | |
result = m.mock(couchdb.client.ViewResults) | |
m.result(result) | |
self.mockQueryResponse(mock_db, filters[1][2]) | |
# an_integer <= 3 | |
mock_db.view('standard_indexes/EntitiesByProperty', count=20, | |
startkey=['TestModel', 'an_integer'], | |
endkey=['TestModel', 'an_integer', 3L], skip=0, | |
descending=False) | |
result = m.mock(couchdb.client.ViewResults) | |
m.result(result) | |
self.mockQueryResponse(mock_db, filters[2][2]) | |
# an_integer < 3 | |
mock_db.view('standard_indexes/EntitiesByProperty', count=20, | |
startkey=['TestModel', 'an_integer'], | |
endkey=['TestModel', 'an_integer', 3L], skip=0, | |
descending=False) | |
result = m.mock(couchdb.client.ViewResults) | |
m.result(result) | |
self.mockQueryResponse(mock_db, filters[2][2]) | |
# 3 <= an_integer <= 5 | |
mock_db.view('standard_indexes/EntitiesByProperty', count=20, | |
startkey=['TestModel', 'an_integer', 3L], | |
endkey=['TestModel', 'an_integer', 5L], skip=0, | |
descending=False) | |
result = m.mock(couchdb.client.ViewResults) | |
m.result(result) | |
self.mockQueryResponse(mock_db, self.result_keys[2:4]) | |
m.replay() | |
# Test a query specifying a greater-than constraint on one property | |
for filter, val, keys in filters: | |
q = TestModel.all().filter(filter, val).fetch(20) | |
for (i, key), tm in zip(keys, q): | |
self.failUnlessEqual(tm.an_integer, i) | |
self.failUnlessEqual(tm.a_string, 'String number %d' % (i,)) | |
q = TestModel.all().filter("an_integer <", 3).fetch(20) | |
for (i, key), tm in zip(self.result_keys[:2], q): | |
self.failUnlessEqual(tm.an_integer, i) | |
self.failUnlessEqual(tm.a_string, 'String number %d' % (i,)) | |
q = TestModel.all().filter("an_integer >=", 3).filter("an_integer <=", 5) | |
for (i, key), tm in zip(self.result_keys[2:4], q.fetch(20)): | |
self.failUnlessEqual(tm.an_integer, i) | |
self.failUnlessEqual(tm.a_string, 'String number %d' % (i,)) | |
def testFullQuery(self): | |
"""Tests querying for an entity with a full set of properties.""" | |
m = self.mocker | |
mock_server = self.mock_server | |
mock_db = self.mock_db | |
mock_db.view('standard_indexes/EntitiesByProperty', count=20, | |
startkey=['FullModel', 'a_string', 'foo'], | |
endkey=['FullModel', 'a_string', 'foo', {}], skip=0) | |
result = m.mock(couchdb.client.ViewResults) | |
m.result(result) | |
result = m.mock(couchdb.client.ViewResults) | |
m.result(result) | |
result.total_rows | |
m.result(1) | |
result.rows | |
m.result([MockViewRow(x[1], 'FullModel', None) | |
for x in self.result_keys][:1]) | |
m.count(1, None) | |
for i, key in self.result_keys[:1]: | |
mock_db[key] | |
m.result({ | |
'_id': key, | |
'_rev': '12345', | |
'path': ['FullModel', i], | |
'type': 'FullModel', | |
'r': { | |
'some_text': ['Lorem Ipsum Dolor Si Amet', 15], | |
'a_blob': ['QmxvYmJ5IQ==', 14] | |
}, | |
'p': { | |
'a_list': [1, None, 2, None, 3, None], | |
'a_datetime': [1222819200000000L, 7], | |
'a_user': [None, None], | |
'an_integer': [42, None], | |
'a_string': ['foo', None], | |
'a_bool': [True, None], | |
'null_property': [None, None], | |
'a_reference': ({ | |
'ref': 'CxIJRnVsbE1vZGVsIgR0ZXN0DA', | |
't': 'ref'}, None), | |
'a_float': [1.9990000000000001, None] | |
} | |
}) | |
m.replay() | |
# Test an equality query | |
FullModel.all().filter('a_string =', 'foo').fetch(20) | |
def testBasicSort(self): | |
m = self.mocker | |
mock_server = self.mock_server | |
mock_db = self.mock_db | |
# Sorted by an_integer ascending | |
mock_db.view('standard_indexes/EntitiesByProperty', count=20, | |
startkey=['TestModel', 'an_integer'], | |
endkey=['TestModel', 'an_integer', {}], skip=0, | |
descending=False) | |
result = m.mock(couchdb.client.ViewResults) | |
m.result(result) | |
self.mockQueryResponse(mock_db, self.result_keys) | |
# Sorted by a_string descending | |
mock_db.view('standard_indexes/EntitiesByProperty', count=20, | |
startkey=['TestModel', 'a_string', {}], | |
endkey=['TestModel', 'a_string'], descending=True, skip=0) | |
result = m.mock(couchdb.client.ViewResults) | |
m.result(result) | |
self.mockQueryResponse(mock_db, self.result_keys[::-1]) | |
m.replay() | |
self.failUnlessEqual(len(TestModel.all().order("an_integer").fetch(20)), | |
len(self.result_keys)) | |
self.failUnlessEqual(len(TestModel.all().order("-a_string").fetch(20)), | |
len(self.result_keys)) | |
def testOffset(self): | |
m = self.mocker | |
mock_server = self.mock_server | |
mock_db = self.mock_db | |
# Order and offset | |
mock_db.view('standard_indexes/EntitiesByProperty', count=20, | |
startkey=['TestModel', 'an_integer'], | |
endkey=['TestModel', 'an_integer', {}], skip=5, | |
descending=False) | |
result = m.mock(couchdb.client.ViewResults) | |
m.result(result) | |
self.mockQueryResponse(mock_db, self.result_keys[5:]) | |
# Simple entity query with offset | |
mock_db.view('standard_indexes/Entities', count=20, key='TestModel', skip=5) | |
result = m.mock(couchdb.client.ViewResults) | |
m.result(result) | |
self.mockQueryResponse(mock_db, self.result_keys[5:]) | |
m.replay() | |
self.failUnlessEqual(len(TestModel.all().order("an_integer").fetch(20, 5)), | |
2) | |
self.failUnlessEqual(len(TestModel.all().fetch(20,5)), 2) | |
def testCount(self): | |
m = self.mocker | |
mock_server = self.mock_server | |
mock_db = self.mock_db | |
mock_db.view('standard_indexes/Entities', count=20, key='TestModel', skip=0) | |
result = m.mock(couchdb.client.ViewResults) | |
m.result(result) | |
result.total_rows | |
m.result(5) | |
result.rows | |
m.result([MockViewRow(x[1], 'TestModel', None) | |
for x in self.result_keys[5:]]) | |
m.count(1, None) | |
m.replay() | |
self.failUnlessEqual(TestModel.all().count(), 5) | |
def testTransactions(self): | |
m = self.mocker | |
mock_server = self.mock_server | |
mock_db = self.mock_db | |
# Expect a fetch | |
mock_db[self.test_entity['_id']] | |
new_test_entity = dict(self.test_entity) | |
new_test_entity['_rev'] = '12345' | |
m.result(new_test_entity) | |
# Expect a put, and simulate contention | |
mock_db.update([{ | |
'path': ('TestModel', 'foo'), | |
'type': 'TestModel', | |
'_id': 'CxIJVGVzdE1vZGVsIgNmb28M', | |
'_rev': '12345', | |
'p': { | |
'an_integer': (123L, None), | |
'a_string': ('foo', None), | |
}, | |
'r': {}, | |
}]) | |
m.throw(couchdb.client.PreconditionFailed()) | |
# Expect another fetch, and another put | |
mock_db[self.test_entity['_id']] | |
new_test_entity = dict(self.test_entity) | |
new_test_entity['_rev'] = '12346' | |
m.result(new_test_entity) | |
m.count(2, 2) # Once for the succeeding tx, once for the failing one. | |
# Expect a put, and simulate contention | |
mock_db.update([{ | |
'path': ('TestModel', 'foo'), | |
'type': 'TestModel', | |
'_id': 'CxIJVGVzdE1vZGVsIgNmb28M', | |
'_rev': '12346', | |
'p': { | |
'an_integer': (123L, None), | |
'a_string': ('foo', None), | |
}, | |
'r': {}, | |
}]) | |
m.replay() | |
class TestError(Exception): pass | |
def testTransaction(fail): | |
obj = TestModel.get_by_key_name("foo") | |
obj.an_integer = 123 | |
obj.put() | |
if fail: raise TestError() | |
db.run_in_transaction(testTransaction, False) | |
try: | |
db.run_in_transaction(testTransaction, True) | |
except TestError: | |
pass | |
def testCreateIndexes(self): | |
m = self.mocker | |
mock_server = self.mock_server | |
mock_db = self.mock_db | |
# Expect the composite indexes design doc to be fetched | |
mock_db['_design/composite_indexes'] | |
m.result({ | |
'language': 'javascript', | |
'views': {}, | |
'indexes': [], | |
'next_index': 1, | |
}) | |
m.count(1, None) | |
# First two indexes | |
mock_db['_design/composite_indexes'] = mocker.ANY | |
m.count(2) | |
# Third request includes all 3 indexes | |
mock_db['_design/composite_indexes'] = { | |
'next_index': 4, | |
'indexes': [ | |
{'id': 1, 'def': ('TestModel', False, 'a_string', 1, 'an_integer', 1)}, | |
{'id': 2, 'def': ('TestModel', True, 'a_string', 1)}, | |
{'id': 3, 'def': ('FullModel', False, 'a_list', 1, 'a_list', 1)} | |
], | |
'language': 'javascript', | |
'views': { | |
'1': { # Basic index on 2 properties | |
'map': "function(doc) {" | |
"key=Array();idx=Array();" | |
"if(doc.type == 'TestModel') {" | |
"for(idx[0] = 0; idx[0] < doc.a_string.length; idx[0] += 2) {" | |
"key.push(doc.a_string[idx[0]]);" | |
"for(idx[1] = 0; idx[1] < doc.an_integer.length; idx[1] += 2) {" | |
"key.push(doc.an_integer[idx[1]]);" | |
"emit(key, null);" | |
"key.pop();" | |
"}" | |
"key.pop();" | |
"}" | |
"}" | |
"}" | |
}, | |
'2': { # Ancestor index on 1 property | |
'map': "function(doc) {" | |
"key=Array();idx=Array();" | |
"if(doc.type == 'TestModel') {" | |
"for(var aidx = 0; aidx < len(doc.path) - 2; aidx += 2) {" | |
"key.push(doc.path.slice(0, aidx));" | |
"for(idx[0] = 0; idx[0] < doc.a_string.length; idx[0] += 2) {" | |
"key.push(doc.a_string[idx[0]]);" | |
"emit(key, null);" | |
"key.pop();" | |
"}" | |
"key.pop();" | |
"}" | |
"}" | |
"}" | |
}, | |
'3': { # Multiply-indexed single property | |
'map': "function(doc) {" | |
"key=Array();idx=Array();" | |
"if(doc.type == 'FullModel') {" | |
"for(idx[0] = 0; idx[0] < doc.a_list.length; idx[0] += 2) {" | |
"key.push(doc.a_list[idx[0]]);" | |
"for(idx[1] = idx[0] + 2; idx[1] < doc.a_list.length; idx[1] += 2) {" | |
"key.push(doc.a_list[idx[1]]);" | |
"emit(key, null);" | |
"key.pop();" | |
"}" | |
"key.pop();" | |
"}" | |
"}" | |
"}" | |
}, | |
} | |
} | |
m.replay() | |
# Create a set of indexes specified in an index.yaml file | |
indexes = datastore_index.ParseIndexDefinitions(""" | |
indexes: | |
# Basic (simple) index | |
- kind: TestModel | |
properties: | |
- name: a_string | |
- name: an_integer | |
# With ancestor | |
- kind: TestModel | |
ancestor: yes | |
properties: | |
- name: a_string | |
# Multiply indexed field | |
- kind: FullModel | |
properties: | |
- name: a_list | |
- name: a_list | |
""").indexes | |
requested = datastore_admin.IndexDefinitionsToProtos("test", indexes) | |
requested = dict((x.definition().Encode(), x) for x in requested) | |
for key, index in requested.iteritems(): | |
datastore_admin.CreateIndex(index) | |
def testCompositeIndexQuery(self): | |
m = self.mocker | |
mock_server = self.mock_server | |
mock_db = self.mock_db | |
mock_db['_design/composite_indexes'] | |
m.result({ | |
'next_index': 4, | |
'indexes': [ | |
{'id': 1, 'def': ('TestModel', False, 'a_string', 1, 'an_integer', 1)}, | |
{'id': 2, 'def': ('TestModel', True, 'a_string', 1)}, | |
{'id': 3, 'def': ('FullModel', False, 'a_list', 1, 'a_list', 1)} | |
], | |
'language': 'javascript', | |
'views': {} | |
}) | |
mock_db.view('composite_indexes/1', count=20, | |
startkey=['String number 5', 5], | |
endkey=['String number 5', 5, {}], skip=0, | |
descending=False) | |
result = m.mock(couchdb.client.ViewResults) | |
m.result(result) | |
self.mockQueryResponse(mock_db, self.result_keys[4:5]) | |
mock_db.view('composite_indexes/1', count=20, | |
startkey=['String number 5', 1, None], | |
endkey=['String number 5', 10], skip=0, | |
descending=False) | |
result = m.mock(couchdb.client.ViewResults) | |
m.result(result) | |
self.mockQueryResponse(mock_db, self.result_keys[4:5]) | |
mock_db.view('composite_indexes/1', count=20, | |
startkey=['String number 2', None], | |
endkey=['String number 6'], skip=0, | |
descending=False) | |
result = m.mock(couchdb.client.ViewResults) | |
m.result(result) | |
self.mockQueryResponse(mock_db, self.result_keys[2:5]) | |
m.replay() | |
# Equality filter on two properties | |
q = TestModel.all().filter("a_string =", "String number 5") | |
q.filter("an_integer =", 5) | |
tm = q.fetch(20) | |
self.failUnlessEqual(len(tm), 1) | |
self.failUnlessEqual(tm[0].an_integer, 5) | |
self.failUnlessEqual(tm[0].a_string, 'String number 5') | |
# Equality and range | |
q = TestModel.all().filter("a_string =", "String number 5") | |
q.filter("an_integer >", 1) | |
q.filter("an_integer <", 10) | |
tm = q.fetch(20) | |
self.failUnlessEqual(len(tm), 1) | |
self.failUnlessEqual(tm[0].an_integer, 5) | |
self.failUnlessEqual(tm[0].a_string, 'String number 5') | |
# Range and sort order | |
q = TestModel.all() | |
q.filter("a_string >", "String number 2") | |
q.filter("a_string <", "String number 6") | |
q.order("a_string") | |
q.order("an_integer") | |
tm = q.fetch(20) | |
self.failUnlessEqual(len(tm), 3) | |
self.failUnlessEqual(tm[0].an_integer, 3) | |
self.failUnlessEqual(tm[0].a_string, 'String number 3') | |
if __name__ == "__main__": | |
unittest.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment