Skip to content

Instantly share code, notes, and snippets.

@Arachnid
Created October 13, 2008 20:58
Show Gist options
  • Save Arachnid/16601 to your computer and use it in GitHub Desktop.
Save Arachnid/16601 to your computer and use it in GitHub Desktop.
Caveats:
- Calling put on a fetched entity more than once has extra overhead.
- Putting entities that have been concurrently modified since they were fetched has extra overhead even outside transactions.
- Creating and putting an entity to overwrite one already in the datastore has extra overhead.
- Attempting to create and put an entity in a transaction when one already exists in the datastore will fail.
- Strings are sorted by a 'smart' caseless sort rather than by unicode codepoint.
- Custom indexes with multiple sort orders are not supported.
- Does not (currently) support merge joins, so no support for multiple equality filters with no composite index.
Bonuses:
- The standard indexes cover filters and sorting (ascending or descending) on any single property.
- Custom indexes can be scanned in reverse.
from google.appengine.datastore import entity_pb
from google.appengine.datastore import datastore_index
from google.appengine.datastore import datastore_pb
from google.appengine.runtime import apiproxy_errors
import base64
import collections
import couchdb
import itertools
import logging
import simplejson
import threading
import types
def PathToString(path):
return base64.urlsafe_b64encode(path.Encode()).replace('=', '')
def ReferenceValueToString(refval):
path = entity_pb.Path()
for elt in refval.pathelement_list():
newelt = path.add_element()
newelt.set_type(elt.type())
if elt.has_name():
newelt.set_name(elt.name())
else:
assert elt.has_id()
newelt.set_id(elt.id())
return PathToString(path)
def StringToPath(s, path):
s = str(s)
modulo = len(s) % 4
if modulo != 0:
s += ('=' * (4 - modulo))
path.ParseFromString(base64.urlsafe_b64decode(s))
def StringToReferenceValue(s, refval):
path = entity_pb.Path()
StringToPath(s, path)
for elt in path.element_list():
newelt = refval.add_pathelement()
newelt.set_type(elt.type())
if elt.has_name():
newelt.set_name(elt.name())
else:
assert elt.has_id()
newelt.set_id(elt.id())
def PathToTuple(path):
return sum(((x.type(), x.name() or x.id()) for x in path.element_list()), ())
def TupleToPath(tup, path):
for i in range(len(tup) / 2):
elt = path.add_element()
elt.set_type(tup[2*i])
if isinstance(tup[2*i + 1], str):
elt.set_name(tup[2*i + 1])
else:
elt.set_id(tup[2*i + 1])
def GetPropertyValue(pb):
pbval = pb.value()
meaning = pb.meaning() if pb.has_meaning() else None
if pbval.has_stringvalue():
value = pbval.stringvalue()
if meaning == entity_pb.Property.BLOB:
value = base64.b64encode(value)
else:
value = unicode(value.decode('utf-8'))
elif pbval.has_int64value():
value = long(pbval.int64value())
elif pbval.has_booleanvalue():
value = bool(pbval.booleanvalue())
elif pbval.has_doublevalue():
value = pbval.doublevalue()
elif pbval.has_referencevalue():
value = {
"t": "ref",
"ref": ReferenceValueToString(pbval.referencevalue()),
}
elif pbval.has_pointvalue():
value = {
"t": "point",
"x": pbval.pointvalue().x(),
"y": pbval.pointvalue().y(),
}
elif pbval.has_uservalue():
value = {
"t": "user",
"email": unicode(pbval.uservalue().email().decode('utf-8')),
"auth_domain": unicode(pbval.uservalue().auth_domain().decode('utf-8')),
}
else:
value = None
return (value, pb.meaning_uri() or meaning)
def GetPropertyValues(plist):
propertybufs = collections.defaultdict(list)
for p in plist:
propertybufs[p.name()].append(p)
properties = {}
for propname, propvals in propertybufs.iteritems():
assert len(propvals) == 1 or propvals[0].multiple()
properties[propname] = sum((GetPropertyValue(x) for x in propvals), ())
return properties
def EntityToDocument(entity, key):
properties = GetPropertyValues(entity.property_list())
raw_properties = GetPropertyValues(entity.raw_property_list())
if '_rev' in properties:
rev = properties['_rev'][0]
del properties['_rev']
else:
rev = None
docid = PathToString(key.path())
document = {
"_id": docid,
"path": PathToTuple(key.path()),
"type": key.path().element(key.path().element_size() - 1).type(),
"p": properties,
"r": raw_properties,
}
if (entity.has_entity_group() and
entity.entity_group().element_size() > 0 and
entity.entity_group().element_list()[0] != key.path().element_list()[0]):
document['group'] = PathToTuple(entity.entity_group())
if rev:
document['_rev'] = rev
return document
PROPERTY_SETTERS = {
str: entity_pb.PropertyValue.set_stringvalue,
unicode: entity_pb.PropertyValue.set_stringvalue,
bool: entity_pb.PropertyValue.set_booleanvalue,
float: entity_pb.PropertyValue.set_doublevalue,
int: entity_pb.PropertyValue.set_int64value,
long: entity_pb.PropertyValue.set_int64value,
types.NoneType: lambda x,y: (x,y),
}
def SetPropertyValue(entity, name, value, meaning, multiple=False):
if meaning in (entity_pb.Property.TEXT, entity_pb.Property.BLOB):
prop = entity.add_raw_property()
else:
prop = entity.add_property()
prop.set_name(name)
if meaning != None:
prop.set_meaning(meaning)
prop.set_multiple(multiple)
propval = prop.mutable_value()
setter = PROPERTY_SETTERS.get(type(value), None)
if setter:
setter(propval, value)
elif isinstance(value, dict):
proptype = value['t']
if proptype == "ref":
ref = prop.mutable_value().mutable_referencevalue()
ref.set_app(entity.key().app())
StringToReferenceValue(value['ref'], ref)
elif proptype == "point":
pt = prop.mutable_value().mutable_pointvalue()
pt.set_x(value['x'])
pt.set_y(value['y'])
elif proptype == "user":
user = prop.mutable_value().mutable_uservalue()
user.set_email(value['email'])
user.set_auth_domain(value['auth_domain'])
elif isinstance(value, basestring) and meaning == entity_pb.Property.BLOB:
propval.set_stringvalue(base64.b64decode(value))
def DocumentToEntity(document, entity):
# Set entity group
if 'group' in document:
TupleToPath(document['group'], entity.mutable_entity_group())
else:
elt = entity.mutable_entity_group().add_element()
elt.CopyFrom(entity.key().path().element_list()[0])
# Set _rev special property
prop = entity.add_property()
prop.set_name('_rev')
prop.mutable_value().set_stringvalue(document['_rev'])
prop.set_multiple(False)
# Set other properties
for k, v in document['p'].iteritems():
if len(v) == 2:
SetPropertyValue(entity, k, v[0], v[1])
else:
for i in range(0, len(v), 2):
SetPropertyValue(entity, k, v[i], v[i+1], True)
KIND = 1
ANCESTOR = 2
def ParseQuery(query):
idx = IndexSpec()
idx.type = query.kind()
assert idx.type, "All queries must specify a kind."
ancestor = query.has_ancestor()
filters = query.filter_list()
orders = query.order_list()
for filter in filters:
assert filter.op() != datastore_pb.Query_Filter.IN, 'Filter.op()==IN'
nprops = len(filter.property_list())
assert nprops == 1, 'Filter has %s properties, expected 1' % nprops
assert (not ancestor or kind or filters
or orders), "Ancestor-only queries not supported"
if ancestor:
idx.ancestor = PathToString(query.ancestor().path())
eq_filters = [f for f in filters
if f.op() in datastore_index.EQUALITY_OPERATORS]
ineq_filters = [f for f in filters
if f.op() in datastore_index.INEQUALITY_OPERATORS]
assert (len(eq_filters) + len(ineq_filters)
== len(filters)), 'Not all filters used'
for f in eq_filters:
prop = f.property(0)
propname = prop.name()
propval = GetPropertyValue(prop)[0]
idx.eq_fields.append(propname)
idx.eq_values[propname].append(propval)
idx.eq_fields.sort()
orders = [x for x in orders if x.property() not in idx.eq_fields]
ineq_property = None
ineq_min = None
ineq_max = None
if ineq_filters:
ineq_property = ineq_filters[0].property(0).name()
for filter in ineq_filters:
assert filter.property(0).name() == ineq_property
op = (GetPropertyValue(filter.property(0))[0], filter.op())
if op[1] in (filter.LESS_THAN, filter.LESS_THAN_OR_EQUAL):
ineq_max = min(ineq_max, op) if ineq_max else op
elif op[1] in (filter.GREATER_THAN, filter.GREATER_THAN_OR_EQUAL):
ineq_min = max(ineq_min, op) if ineq_min else op
if ineq_property:
if orders:
assert ineq_property == orders[0].property()
# First order is satisfied by inequality
else:
idx.range_fields.append((ineq_property, datastore_index.ASCENDING))
if ineq_min:
idx.idx_min.append(ineq_min[0])
if ineq_min[1] == datastore_pb.Query_Filter.GREATER_THAN:
# First key after this one
idx.idx_min.append(None)
if ineq_max:
if ineq_max[1] == datastore_pb.Query_Filter.LESS_THAN:
idx.exclusive_max = True
idx.idx_max.append(ineq_max[0])
else:
# Last key with this prefix
idx.idx_max.append({})
for order in orders:
idx.range_fields.append((order.property(), order.direction()))
return idx
class IdSequence(object):
BATCH_SIZE = 10
def __init__(self, db, kind):
self.db = db
self._id = "_%s_seq" % (kind,)
self._seqobj = None
self._next = None
def next(self):
if not self._seqobj:
try:
self._seqobj = self.db[self._id]
except couchdb.client.ResourceNotFound:
self._seqobj = { 'next': 1 }
if self._next is None or self._next == self._seqobj['next']:
while True:
self._next = self._seqobj['next']
self._seqobj['next'] += self.BATCH_SIZE
try:
self.db[self._id] = self._seqobj
break
except couchdb.client.PreconditionFailed:
self._seqobj = self.db[self._id]
ret = self._next
self._next += 1
return ret
class QueryPlan(object):
def __init__(self, db, view_name, exclusive_max=False, batch_size=20, **args):
self.db = db
self.view_name = view_name
self.args = args
self.exclusive_max = False
self.batch_size = batch_size
self._results = []
self._finished = False
self._count = None
self._num_fetched = 0
self._keys_returned = set()
def _BuildResults(self, ents):
# TODO(nickjohnson): Improve this to use include_docs and/or multi-fetch
# once they become available.
for ent in ents:
if self.exclusive_max and ent.key == self.args['endkey']:
self._finished = True
return
doc = self.db[ent.id]
yield doc
def _GetMore(self, count):
if count < self.batch_size:
count = self.batch_size
self.args['count'] = count
result = self.db.view(self.view_name, **self.args)
self._count = result.total_rows
self._num_fetched += len(result.rows)
for row in result.rows:
rowid = row.id
if rowid in self._keys_returned:
break
self._keys_returned.add(rowid)
self._results.append(row)
if len(result.rows) < count or self._num_fetched == self._count:
self._finished = True
elif count > 0:
self.args['startkey_docid'] = result.rows[-1].id
self.args['skip'] = 1
def Next(self, count):
if len(self._results) < count and not self._finished:
self._GetMore(count - len(self._results))
ret = self._BuildResults(self._results[:count])
del self._results[:count]
return ret
def HasMore(self):
if self._count is None:
self._GetMore(0)
return self._finished or self._count == self._num_fetched
def Count(self):
if self._count == None:
self._GetMore(0)
return self._count
class IndexSpec(object):
def __init__(self):
# Field names used in equality tests
self.eq_fields = []
# Values for equality fields
self.eq_values = collections.defaultdict(list)
# Field name, direction tuples used in range or order queries
self.range_fields = []
# Upper and lower bounds for non-equality fields
self.idx_min = []
self.idx_max = []
# Is the upper limit exclusive?
self.exclusive_max = False
# Type name
self.type = None
# Ancestor key
self.ancestor = None
def ConstructPlan(self, db, query, idx_name, idx_def):
"""Attempts to construct a QueryPlan.
Args:
db: The database to execute the plan against.
query: The query.
idx_name: The fully qualified name of the view this index is realised in.
idx_def: A tuple specifying the index to attempt to execute this
plan against.
Returns:
A QueryPlan object, or None if the index is not suitable.
"""
key_base = []
if self.type != idx_def[0]:
# Not the right kind
return None
if (self.ancestor == None) == idx_def[1]:
# One specifies ancestor and the other doesn't.
return None
elif self.ancestor:
key_base.append(self.ancestor)
if len(idx_def) < len(self.eq_fields) * 2 + 2:
# Not enough index elements to satisfy the equality constraints
return None
idx_eq_fields = [idx_def[i*2+2] for i in range(len(self.eq_fields))]
if sorted(idx_eq_fields) != self.eq_fields:
# First n fields of index do not match equality fields
return None
else:
# Add the equality filters to the key base in index order
eq_values = dict((x, list(y)) for x,y in self.eq_values.iteritems())
for fld in idx_eq_fields:
key_base.append(eq_values[fld].pop())
# True=forward, False=reversed, None=either
scan_order = None
idx_ineq_fields = idx_def[len(self.eq_fields) * 2 + 2:]
if len(self.range_fields) * 2 != len(idx_ineq_fields):
return None
for i, fld in enumerate(self.range_fields):
if fld[0] != idx_ineq_fields[i*2]:
# Different inequality field
return None
dir_cmp = (fld[1] == idx_ineq_fields[i*2+1])
if scan_order is None:
scan_order = dir_cmp
elif dir_cmp != scan_order:
# Directions do not match
return None
if scan_order is None:
scan_order = True
if scan_order:
startkey = key_base + self.idx_min
endkey = key_base + self.idx_max
else:
endkey = key_base + self.idx_min
startkey = key_base + self.idx_max
return QueryPlan(db, idx_name, self.exclusive_max, startkey=startkey,
endkey=endkey, descending=not scan_order,
skip=query.offset())
class DatastoreCouchDBStub(object):
""" Datastore stub implementation that uses a couchdb instance."""
MAX_PUT_ATTEMPTS = 5 # Most we will retry a put operation due to conflicts.
STANDARD_INDEXES_DOC = {
'language': 'javascript',
'views': {
'Entities': {
'map': """function(doc) {
emit(doc.type, null);
}""",
},
'EntitiesByProperty': {
'map': """function (doc) {
properties = doc.p;
for(var p in properties) {
vals = properties[p]
for(var i = 0; i < vals.length; i+= 2)
emit([doc.type, p, vals[i]], null);
}
}""",
},
'Schema': {
'map': """function (doc) {
emit(doc.path[doc.path.length - 2], doc.p);
}""",
'reduce': """function(keys, values) {
properties = {};
for(var i = 0; i < values.length; i++)
for(var p in values[i])
properties[p] = values[i][p]
return properties;
}""",
}
}
}
COMPOSITE_INDEXES_TEMPLATE = {
'language': 'javascript',
'views': {},
'indexes': [],
'next_index': 1,
}
def __init__(self, server):
self._server = server
self._known_dbs = set()
self._sequences = {}
self._sequence_lock = threading.Lock()
self._transaction_documents = {}
self._next_transaction_id = 1
self._transaction_lock = threading.Lock()
self._cursors = {}
self._next_cursor_id = 1
self._cursor_lock = threading.Lock()
self._composite_indexes = {}
def MakeSyncCall(self, service, call, request, response):
assert service == 'datastore_v3'
explanation = []
assert request.IsInitialized(explanation), explanation
func = (getattr(self, "_Dynamic_" + call))
if not func:
apiproxy_errors.CallNotFoundError()
func(request, response)
assert response.IsInitialized(explanation), explanation
def _CreateDb(self, appid):
self._server.create(appid)
db = self._server[appid]
db['_design/standard_indexes'] = self.STANDARD_INDEXES_DOC
db['_design/composite_indexes'] = self.COMPOSITE_INDEXES_TEMPLATE
def _GetDb(self, appid):
db = self._server[appid]
if appid not in self._known_dbs:
try:
db.info()
except couchdb.client.ResourceNotFound:
self._CreateDb(appid)
self._known_dbs.add(appid)
return db
def _GetNextId(self, appid, typename):
self._sequence_lock.acquire()
try:
key = (appid, typename)
sequence = self._sequences.get(key, None)
if not sequence:
sequence = IdSequence(self._GetDb(appid), typename)
self._sequences[key] = sequence
return sequence.next()
finally:
self._sequence_lock.release()
def _UpdateRevisions(self, docs, db):
for doc in docs:
doc['_rev'] = db[doc['_id']]['_rev']
def _WriteDatastore(self, docs, db, transactional=False):
for i in range(self.MAX_PUT_ATTEMPTS):
try:
db.update(docs)
return
except couchdb.client.PreconditionFailed:
if transactional:
raise
self._UpdateRevisions(docs, db)
else:
# Max attempts expired
if transactional:
raise apiproxy_errors.ApplicationError(
datastore_pb.Error.CONCURRENT_TRANSACTION)
else:
raise apiproxy_errors.ApplicationError(datastore_pb.Error.TIMEOUT)
def _UpdateIndexes(self, app, doc):
indexes = collections.defaultdict(list)
# Group indexes by entity type
for idx in doc['indexes']:
indexes[idx['def'][0]].append(idx)
self._composite_indexes[app] = indexes
def _GetIndexDefinitions(self, app):
"""Returns a list of composite index definitions."""
if app not in self._composite_indexes:
db = self._GetDb(app)
self._UpdateIndexes(app, db['_design/composite_indexes'])
return self._composite_indexes[app]
def _AddIndex(self, app, db, idx_def, map_func):
"""Adds a new composite index.
args:
idx_def: The tuple describing this index.
map_func: The javascript map function that generates this index.
"""
for i in range(self.MAX_PUT_ATTEMPTS):
try:
doc = db['_design/composite_indexes']
idx_id = doc['next_index']
doc['next_index'] += 1
doc['views'][str(idx_id)] = { 'map': map_func }
doc['indexes'].append({'id': idx_id, 'def': idx_def})
db['_design/composite_indexes'] = doc
self._UpdateIndexes(app, doc)
return idx_id
except couchdb.client.PreconditionFailed:
pass
else:
# Max retries reached
raise apiproxy_errors.ApplicationError(datastore_pb.Error.TIMEOUT)
def _DeleteIndex(self, idx_id):
"""Deletes a composite index."""
pass
def _Dynamic_Put(self, put_request, put_response):
entities = put_request.entity_list()
app = entities[0].key().app()
assert all(x.key().app() == app for x in entities)
keys = []
documents = []
for entity in put_request.entity_list():
assert entity.has_key()
assert entity.key().path().element_size() > 0
key = entity_pb.Reference()
key.CopyFrom(entity.key())
keys.append(key)
last_path = key.path().element_list()[-1]
if last_path.id() == 0 and not last_path.has_name():
last_path.set_id(self._GetNextId(app, last_path.type()))
assert entity.entity_group().element_size() == 0
else:
assert (entity.has_entity_group() and
entity.entity_group().element_size() > 0)
documents.append(EntityToDocument(entity, key))
if put_request.has_transaction():
txid = put_request.transaction().handle()
self._transaction_documents[txid][app].extend(documents)
else:
self._WriteDatastore(documents, self._GetDb(app))
put_response.key_list().extend(keys)
def _Dynamic_Get(self, get_request, get_response):
for key in get_request.key_list():
db = self._GetDb(key.app())
document = db[PathToString(key.path())]
entity = get_response.add_entity().mutable_entity()
entity.mutable_key().CopyFrom(key)
DocumentToEntity(document, entity)
def _Dynamic_Delete(self, delete_request, delete_response):
keys = delete_request.key_list()
app = keys[0].app()
assert all(x.app() == app for x in keys)
db = self._GetDb(app)
documents = []
for key in delete_request.key_list():
documents.append({'_id': PathToString(key.path()), '_deleted': True})
# Fetch revision IDs for all the docs we're deleting
self._UpdateRevisions(documents, db)
if delete_request.has_transaction():
txid = delete_request.transaction().handle()
self._transaction_documents[txid][app].extend(documents)
else:
self._WriteDatastore(documents, db)
def _CustomIndexPlanner(self, db, query, idxspec):
"""Satisfies queries that require a custom index."""
indexes = self._GetIndexDefinitions(query.app()).get(idxspec.type, [])
for idx in indexes:
plan = idxspec.ConstructPlan(db, query, 'composite_indexes/%d'
% (idx['id']), idx['def'])
if plan:
return plan
raise apiproxy_errors.ApplicationError(
datastore_pb.Error.NEED_INDEX,
"This query requires a composite index that is not defined. "
"You must update the index.yaml file in your application root.")
def _SingleEqualityIndexPlanner(self, db, query, idxspec):
"""Satisfies queries that specify a kind and a single equality."""
fld = list(idxspec.eq_fields)[0]
startkey = [idxspec.type, fld, idxspec.eq_values[fld][0]]
endkey = startkey + [{}]
return QueryPlan(db, 'standard_indexes/EntitiesByProperty',
idxspec.exclusive_max, startkey=startkey, endkey=endkey,
skip=query.offset())
def _SingleInequalityIndexPlanner(self, db, query, idxspec):
fld, desc = idxspec.range_fields[0]
startkey = [idxspec.type, fld] + idxspec.idx_min
endkey = [idxspec.type, fld] + idxspec.idx_max
if desc == datastore_index.DESCENDING:
return QueryPlan(db, 'standard_indexes/EntitiesByProperty',
idxspec.exclusive_max, startkey=endkey, endkey=startkey,
descending=True, skip=query.offset())
else:
return QueryPlan(db, 'standard_indexes/EntitiesByProperty',
idxspec.exclusive_max, startkey=startkey, endkey=endkey,
descending=False, skip=query.offset())
def _KindIndexPlanner(self, db, query, idxspec):
"""Satisfies queries that specify only an entity."""
return QueryPlan(db, 'standard_indexes/Entities', idxspec.exclusive_max,
key=idxspec.type, skip=query.offset())
def _GetQueryPlan(self, query):
app = query.app()
db = self._GetDb(app)
idxspec = ParseQuery(query)
if not idxspec.eq_fields and not idxspec.range_fields:
return self._KindIndexPlanner(db, query, idxspec)
elif len(idxspec.eq_fields) == 1 and not idxspec.range_fields:
return self._SingleEqualityIndexPlanner(db, query, idxspec)
elif not idxspec.eq_fields and len(idxspec.range_fields) == 1:
return self._SingleInequalityIndexPlanner(db, query, idxspec)
else:
return self._CustomIndexPlanner(db, query, idxspec)
def _Dynamic_RunQuery(self, query, query_result):
plan = self._GetQueryPlan(query)
self._cursor_lock.acquire()
try:
self._cursors[self._next_cursor_id] = (query.app(), plan)
query_result.mutable_cursor().set_cursor(self._next_cursor_id)
query_result.set_more_results(plan.HasMore())
self._next_cursor_id += 1
finally:
self._cursor_lock.release()
def _Dynamic_Next(self, next_request, query_result):
cursor_id = next_request.cursor().cursor()
app, q = self._cursors.get(cursor_id, None)
if not q:
query_result.set_more_results(False)
return
for result_doc in q.Next(next_request.count()):
result = query_result.add_result()
key = result.mutable_key()
key.set_app(app)
path = key.mutable_path()
TupleToPath(result_doc['path'], path)
DocumentToEntity(result_doc, result)
query_result.set_more_results(q.HasMore())
def _Dynamic_Count(self, query, integer64proto):
q = self._GetQueryPlan(query)
if not q:
integer64proto.set_value(0)
else:
integer64proto.set_value(q.Count())
def _Dynamic_BeginTransaction(self, request, transaction):
self._transaction_lock.acquire()
try:
transaction.set_handle(self._next_transaction_id)
txdocs = collections.defaultdict(list)
self._transaction_documents[self._next_transaction_id] = txdocs
self._next_transaction_id += 1
finally:
self._transaction_lock.release()
def _Dynamic_Commit(self, transaction, transaction_response):
txid = transaction.handle()
try:
for app, docs in self._transaction_documents[txid].iteritems():
db = self._GetDb(app)
self._WriteDatastore(docs, db, True)
except couchdb.client.PreconditionFailed:
raise apiproxy_errors.ApplicationError(
datastore_pb.Error.CONCURRENT_TRANSACTION)
finally:
del self._transaction_documents[txid]
def _Dynamic_Rollback(self, transaction, transaction_response):
txid = transaction.handle()
if txid in self._transaction_documents:
del self._transaction_documents[txid]
def _Dynamic_GetSchema(self, app_str, schema):
pass
def _GenerateJsMapFunc(self, idx_def):
map_func = ("function(doc) {"
"key=Array();"
"idx=Array();"
"if(doc.type == '%s') {"
"%%s"
"}"
"}" % (idx_def[0],))
if idx_def[1]:
# Ancestor
map_func %= ("for(var aidx = 0; aidx < len(doc.path) - 2; aidx += 2) {"
"key.push(doc.path.slice(0, aidx));"
"%s"
"key.pop();"
"}")
prev_indexes = {}
for i in range(len(idx_def) / 2 - 1):
propname = idx_def[i*2 + 2]
if propname in prev_indexes:
start = "idx[%d] + 2" % (prev_indexes[propname])
else:
start = "0"
prev_indexes[propname] = i
map_func %= ("for(idx[%(num)d] = %(start)s; "
"idx[%(num)d] < doc.%(prop)s.length; idx[%(num)d] += 2) {"
"key.push(doc.%(prop)s[idx[%(num)d]]);"
"%%s"
"key.pop();"
"}" % {"num": i, "prop": propname, "start": start})
map_func %= "emit(key, null);"
return map_func
def _Dynamic_CreateIndex(self, index, id_response):
app = index.app_id()
idx = index.definition()
idx_props = idx.property_list()
assert (all(idx_props[0].direction() == x.direction() for x in idx_props),
"Indexes with multiple sort orders not yet supported.")
idx_def = ((idx.entity_type(), idx.ancestor()) +
sum(((x.name(), x.direction()) for x in idx_props), ()))
map_func = self._GenerateJsMapFunc(idx_def)
idx_id = self._AddIndex(app, self._GetDb(app), idx_def, map_func)
id_response.set_value(idx_id)
def _Dynamic_GetIndices(self, app_str, composite_indices):
pass
def _Dynamic_UpdateIndex(self, index, void):
pass
def _Dynamic_DeleteIndex(self, index, void):
pass
def _EndRequest(self):
"""Frees per-request resources."""
if self._transaction_documents:
logging.warn("Cleaning up %d forgotten transactions."
% (len(self._transaction_documents)))
self._transaction_documents = {}
self._cursors = {}
import cStringIO
import couchdb
import datetime
import os
import mocker
import unittest
from google.appengine.api import apiproxy_stub_map
from google.appengine.api import datastore_admin
from google.appengine.api import users
from google.appengine.datastore import datastore_index
from google.appengine.ext import db
import datastore_couchdb_stub
class TestModel(db.Model):
a_string = db.StringProperty()
an_integer = db.IntegerProperty()
class FullModel(db.Model):
a_string = db.StringProperty(default="foo")
a_bool = db.BooleanProperty(default=True)
an_integer = db.IntegerProperty(default=42)
a_float = db.FloatProperty(default=1.999)
a_datetime = db.DateTimeProperty(default=datetime.datetime(2008, 10, 01))
a_list = db.ListProperty(int, default=[1,2,3])
a_reference = db.SelfReferenceProperty()
a_user = db.UserProperty()
a_blob = db.BlobProperty(default="Blobby!")
some_text = db.TextProperty(default="Lorem Ipsum Dolor Si Amet")
null_property = db.IntegerProperty()
class MockViewRow(object):
def __init__(self, id, key, value):
self.id = id
self.key = key
self.value = value
class TestDatastore(mocker.MockerTestCase):
def setUp(self):
apiproxy_stub_map.apiproxy = apiproxy_stub_map.APIProxyStubMap()
self.mock_server = self.mocker.mock(couchdb.client.Server)
stub = datastore_couchdb_stub.DatastoreCouchDBStub(self.mock_server)
apiproxy_stub_map.apiproxy.RegisterStub('datastore_v3', stub)
self.mock_db = self.mocker.mock(couchdb.client.Database)
self.mock_server["test"]
self.mocker.count(1, None)
self.mocker.result(self.mock_db)
self.mock_db.info()
os.environ['APPLICATION_ID'] = "test"
os.environ['AUTH_DOMAIN'] = "mydomain.com"
self.test_entity = {
'path': ('TestModel', 'foo'),
'type': 'TestModel',
'_id': 'CxIJVGVzdE1vZGVsIgNmb28M',
'p': {
'an_integer': (42L, None),
'a_string': ('foo', None),
},
'r': {},
}
self.result_keys = [
(1, 'agR0ZXN0cg8LEglUZXN0TW9kZWwYAgw'),
(2, 'agR0ZXN0cg8LEglUZXN0TW9kZWwYAww'),
(3, 'agR0ZXN0cg8LEglUZXN0TW9kZWwYBAw'),
(4, 'agR0ZXN0cg8LEglUZXN0TW9kZWwYBQw'),
(5, 'agR0ZXN0cg8LEglUZXN0TW9kZWwYBgw'),
(6, 'agR0ZXN0cg8LEglUZXN0TW9kZWwYAQw'),
(7, 'agR0ZXN0cg8LEglUZXN0TW9kZWwYBww'),
]
def testPut(self):
m = self.mocker
mock_server = self.mock_server
mock_db = self.mock_db
# Expect update to be called with an entity
mock_db.update([self.test_entity])
# Expect to be queried for a sequence
datastore_couchdb_stub.IdSequence.BATCH_SIZE = 1
mock_db['_TestModel_seq']
m.throw(couchdb.client.ResourceNotFound())
mock_db['_TestModel_seq'] = {'next':2}
mock_db.update([
{
'path': ('TestModel', 1),
'type': 'TestModel',
'_id': 'CxIJVGVzdE1vZGVsGAEM',
'p': {
'an_integer': (24L, None),
'a_string': ('oof', None),
},
'r': {},
},
])
# Simulate contention on the sequence counter
mock_db['_TestModel_seq'] = {'next':3}
m.throw(couchdb.client.PreconditionFailed())
mock_db['_TestModel_seq']
m.result({'_id':'_TestModel_seq','_rev':'12345','next':5})
mock_db['_TestModel_seq'] = {'_id':'_TestModel_seq','_rev':'12345','next':6}
mock_db.update([
{
'path': ('TestModel', 1, 'TestModel', 5),
'type': 'TestModel',
'_id': 'CxIJVGVzdE1vZGVsGAEMCxIJVGVzdE1vZGVsGAUM',
'p': {
'an_integer': (25L, None),
'a_string': ('oof', None),
},
'r': {},
},
])
m.replay()
# Test a model with a name provided
mymodel = TestModel(key_name="foo")
mymodel.a_string = "foo"
mymodel.an_integer = 42
mymodel.put()
# Test a model with a generated ID
mymodel = TestModel()
mymodel.a_string = "oof"
mymodel.an_integer = 24
mymodel.put()
# Test a model with another generated ID
mymodel = TestModel(parent=mymodel)
mymodel.a_string = "oof"
mymodel.an_integer = 25
mymodel.put()
def testFullPut(self):
m = self.mocker
mock_server = self.mock_server
mock_db = self.mock_db
# Expect update to be called with an entity
mock_db.update([
{
'path': ('FullModel', 'test'),
'type': 'FullModel',
'_id': 'CxIJRnVsbE1vZGVsIgR0ZXN0DA',
'p': {
'a_list': (1L, None, 2L, None, 3L, None),
'a_user': (
{
't': 'user',
'auth_domain': 'mydomain.com',
'email': 'foo@bar.com'
}, None),
'a_datetime': (1222819200000000L, 7),
'an_integer': (42L, None),
'a_string': ('foo', None),
'a_bool': (True, None),
'null_property': (None, None),
'a_reference': ({
'ref': 'CxIJRnVsbE1vZGVsIgR0ZXN0DA',
't': 'ref'}, None),
'a_float': (1.9990000000000001, None)
},
'r': {
'some_text': ('Lorem Ipsum Dolor Si Amet', 15),
'a_blob': ('QmxvYmJ5IQ==', 14),
},
}
])
m.replay()
mymodel = FullModel(key_name="test")
mymodel.a_user = users.User("foo@bar.com")
mymodel.a_reference = db.Key.from_path('FullModel', 'test')
mymodel.put()
def testConflicts(self):
m = self.mocker
mock_server = self.mock_server
mock_db = self.mock_db
# Expect update to be called with an entity
mock_db.update([self.test_entity])
m.throw(couchdb.client.PreconditionFailed())
# Expect to be queried for a new revision ID
mock_db['CxIJVGVzdE1vZGVsIgNmb28M']
m.result({'_id':'CxIJVGVzdE1vZGVsIgNmb28M', '_rev': '12345'})
# Expect another update with the new revision ID
new_test_entity = dict(self.test_entity)
new_test_entity['_rev'] = '12345'
mock_db.update([new_test_entity])
m.replay()
mymodel = TestModel(key_name="foo")
mymodel.a_string = "foo"
mymodel.an_integer = 42
mymodel.put()
def testCreateDB(self):
m = self.mocker
mock_server = self.mock_server
mock_db = self.mock_db
m.throw(couchdb.client.ResourceNotFound())
mock_server.create("test")
# Expect a design doc to be created
mock_db["_design/standard_indexes"] = \
datastore_couchdb_stub.DatastoreCouchDBStub.STANDARD_INDEXES_DOC
mock_db["_design/composite_indexes"] = \
datastore_couchdb_stub.DatastoreCouchDBStub.COMPOSITE_INDEXES_TEMPLATE
# Expect update to be called with an entity
mock_db.update([self.test_entity])
m.replay()
mymodel = TestModel(key_name="foo")
mymodel.a_string = "foo"
mymodel.an_integer = 42
mymodel.put()
def testGet(self):
m = self.mocker
mock_server = self.mock_server
mock_db = self.mock_db
# Expect update to be called with an entity
mock_db.update([self.test_entity])
# Expect an entity to be fetched
mock_db[self.test_entity['_id']]
new_test_entity = dict(self.test_entity)
new_test_entity['_rev'] = '12345'
m.result(new_test_entity)
m.replay()
mymodel = TestModel(key_name="foo")
mymodel.a_string = "foo"
mymodel.an_integer = 42
mymodel.put()
mymodel2 = TestModel.get_by_key_name("foo")
self.failUnlessEqual(mymodel.a_string, mymodel2.a_string)
self.failUnlessEqual(mymodel.an_integer, mymodel2.an_integer)
def testDelete(self):
m = self.mocker
mock_server = self.mock_server
mock_db = self.mock_db
# Expect a fetch
mock_db[self.test_entity['_id']]
new_test_entity = dict(self.test_entity)
new_test_entity['_rev'] = '12345'
m.result(new_test_entity)
m.count(2, 2)
# Expect a delete
mock_db.update([{
'_id': new_test_entity['_id'],
'_rev': new_test_entity['_rev'],
'_deleted': True,
}])
m.replay()
mymodel = TestModel.get_by_key_name('foo')
mymodel.delete()
def mockQueryResponse(self, mock_db, keys):
m = self.mocker
result = m.mock(couchdb.client.ViewResults)
m.result(result)
result.total_rows
m.result(len(keys))
result.rows
m.result([MockViewRow(x[1], 'TestModel', None) for x in keys])
m.count(1, None)
for i, key in keys:
mock_db[key]
m.result({
'_id': key,
'_rev': '12345',
'path': ['TestModel', i],
'type': 'TestModel',
'r': {},
'p': {
'an_integer': [i, None],
'a_string': ['String number %d' % (i,), None]
}
})
def testQuery(self):
m = self.mocker
mock_server = self.mock_server
mock_db = self.mock_db
mock_db.view('standard_indexes/Entities', count=20, key='TestModel', skip=0)
result = m.mock(couchdb.client.ViewResults)
m.result(result)
self.mockQueryResponse(mock_db, self.result_keys[:5])
m.replay()
# Test a simple kind-only query.
for i, tm in enumerate(TestModel.all().fetch(20)):
self.failUnlessEqual(tm.an_integer, i + 1)
self.failUnlessEqual(tm.a_string, 'String number %d' % (i + 1,))
def testPropertyIndexQueries(self):
m = self.mocker
mock_server = self.mock_server
mock_db = self.mock_db
filters = [
("an_integer >=", 5, self.result_keys[:-3]),
("an_integer >", 5, self.result_keys[:-2]),
("an_integer <=", 3, self.result_keys[:3]),
]
# an_integer >= 5
mock_db.view('standard_indexes/EntitiesByProperty', count=20,
startkey=['TestModel', 'an_integer', 5L],
endkey=['TestModel', 'an_integer', {}], skip=0,
descending=False)
result = m.mock(couchdb.client.ViewResults)
m.result(result)
self.mockQueryResponse(mock_db, filters[0][2])
# an_integer > 5
mock_db.view('standard_indexes/EntitiesByProperty', count=20,
startkey=['TestModel', 'an_integer', 5L, None],
endkey=['TestModel', 'an_integer', {}], skip=0,
descending=False)
result = m.mock(couchdb.client.ViewResults)
m.result(result)
self.mockQueryResponse(mock_db, filters[1][2])
# an_integer <= 3
mock_db.view('standard_indexes/EntitiesByProperty', count=20,
startkey=['TestModel', 'an_integer'],
endkey=['TestModel', 'an_integer', 3L], skip=0,
descending=False)
result = m.mock(couchdb.client.ViewResults)
m.result(result)
self.mockQueryResponse(mock_db, filters[2][2])
# an_integer < 3
mock_db.view('standard_indexes/EntitiesByProperty', count=20,
startkey=['TestModel', 'an_integer'],
endkey=['TestModel', 'an_integer', 3L], skip=0,
descending=False)
result = m.mock(couchdb.client.ViewResults)
m.result(result)
self.mockQueryResponse(mock_db, filters[2][2])
# 3 <= an_integer <= 5
mock_db.view('standard_indexes/EntitiesByProperty', count=20,
startkey=['TestModel', 'an_integer', 3L],
endkey=['TestModel', 'an_integer', 5L], skip=0,
descending=False)
result = m.mock(couchdb.client.ViewResults)
m.result(result)
self.mockQueryResponse(mock_db, self.result_keys[2:4])
m.replay()
# Test a query specifying a greater-than constraint on one property
for filter, val, keys in filters:
q = TestModel.all().filter(filter, val).fetch(20)
for (i, key), tm in zip(keys, q):
self.failUnlessEqual(tm.an_integer, i)
self.failUnlessEqual(tm.a_string, 'String number %d' % (i,))
q = TestModel.all().filter("an_integer <", 3).fetch(20)
for (i, key), tm in zip(self.result_keys[:2], q):
self.failUnlessEqual(tm.an_integer, i)
self.failUnlessEqual(tm.a_string, 'String number %d' % (i,))
q = TestModel.all().filter("an_integer >=", 3).filter("an_integer <=", 5)
for (i, key), tm in zip(self.result_keys[2:4], q.fetch(20)):
self.failUnlessEqual(tm.an_integer, i)
self.failUnlessEqual(tm.a_string, 'String number %d' % (i,))
def testFullQuery(self):
"""Tests querying for an entity with a full set of properties."""
m = self.mocker
mock_server = self.mock_server
mock_db = self.mock_db
mock_db.view('standard_indexes/EntitiesByProperty', count=20,
startkey=['FullModel', 'a_string', 'foo'],
endkey=['FullModel', 'a_string', 'foo', {}], skip=0)
result = m.mock(couchdb.client.ViewResults)
m.result(result)
result = m.mock(couchdb.client.ViewResults)
m.result(result)
result.total_rows
m.result(1)
result.rows
m.result([MockViewRow(x[1], 'FullModel', None)
for x in self.result_keys][:1])
m.count(1, None)
for i, key in self.result_keys[:1]:
mock_db[key]
m.result({
'_id': key,
'_rev': '12345',
'path': ['FullModel', i],
'type': 'FullModel',
'r': {
'some_text': ['Lorem Ipsum Dolor Si Amet', 15],
'a_blob': ['QmxvYmJ5IQ==', 14]
},
'p': {
'a_list': [1, None, 2, None, 3, None],
'a_datetime': [1222819200000000L, 7],
'a_user': [None, None],
'an_integer': [42, None],
'a_string': ['foo', None],
'a_bool': [True, None],
'null_property': [None, None],
'a_reference': ({
'ref': 'CxIJRnVsbE1vZGVsIgR0ZXN0DA',
't': 'ref'}, None),
'a_float': [1.9990000000000001, None]
}
})
m.replay()
# Test an equality query
FullModel.all().filter('a_string =', 'foo').fetch(20)
def testBasicSort(self):
m = self.mocker
mock_server = self.mock_server
mock_db = self.mock_db
# Sorted by an_integer ascending
mock_db.view('standard_indexes/EntitiesByProperty', count=20,
startkey=['TestModel', 'an_integer'],
endkey=['TestModel', 'an_integer', {}], skip=0,
descending=False)
result = m.mock(couchdb.client.ViewResults)
m.result(result)
self.mockQueryResponse(mock_db, self.result_keys)
# Sorted by a_string descending
mock_db.view('standard_indexes/EntitiesByProperty', count=20,
startkey=['TestModel', 'a_string', {}],
endkey=['TestModel', 'a_string'], descending=True, skip=0)
result = m.mock(couchdb.client.ViewResults)
m.result(result)
self.mockQueryResponse(mock_db, self.result_keys[::-1])
m.replay()
self.failUnlessEqual(len(TestModel.all().order("an_integer").fetch(20)),
len(self.result_keys))
self.failUnlessEqual(len(TestModel.all().order("-a_string").fetch(20)),
len(self.result_keys))
def testOffset(self):
m = self.mocker
mock_server = self.mock_server
mock_db = self.mock_db
# Order and offset
mock_db.view('standard_indexes/EntitiesByProperty', count=20,
startkey=['TestModel', 'an_integer'],
endkey=['TestModel', 'an_integer', {}], skip=5,
descending=False)
result = m.mock(couchdb.client.ViewResults)
m.result(result)
self.mockQueryResponse(mock_db, self.result_keys[5:])
# Simple entity query with offset
mock_db.view('standard_indexes/Entities', count=20, key='TestModel', skip=5)
result = m.mock(couchdb.client.ViewResults)
m.result(result)
self.mockQueryResponse(mock_db, self.result_keys[5:])
m.replay()
self.failUnlessEqual(len(TestModel.all().order("an_integer").fetch(20, 5)),
2)
self.failUnlessEqual(len(TestModel.all().fetch(20,5)), 2)
def testCount(self):
m = self.mocker
mock_server = self.mock_server
mock_db = self.mock_db
mock_db.view('standard_indexes/Entities', count=20, key='TestModel', skip=0)
result = m.mock(couchdb.client.ViewResults)
m.result(result)
result.total_rows
m.result(5)
result.rows
m.result([MockViewRow(x[1], 'TestModel', None)
for x in self.result_keys[5:]])
m.count(1, None)
m.replay()
self.failUnlessEqual(TestModel.all().count(), 5)
def testTransactions(self):
m = self.mocker
mock_server = self.mock_server
mock_db = self.mock_db
# Expect a fetch
mock_db[self.test_entity['_id']]
new_test_entity = dict(self.test_entity)
new_test_entity['_rev'] = '12345'
m.result(new_test_entity)
# Expect a put, and simulate contention
mock_db.update([{
'path': ('TestModel', 'foo'),
'type': 'TestModel',
'_id': 'CxIJVGVzdE1vZGVsIgNmb28M',
'_rev': '12345',
'p': {
'an_integer': (123L, None),
'a_string': ('foo', None),
},
'r': {},
}])
m.throw(couchdb.client.PreconditionFailed())
# Expect another fetch, and another put
mock_db[self.test_entity['_id']]
new_test_entity = dict(self.test_entity)
new_test_entity['_rev'] = '12346'
m.result(new_test_entity)
m.count(2, 2) # Once for the succeeding tx, once for the failing one.
# Expect a put, and simulate contention
mock_db.update([{
'path': ('TestModel', 'foo'),
'type': 'TestModel',
'_id': 'CxIJVGVzdE1vZGVsIgNmb28M',
'_rev': '12346',
'p': {
'an_integer': (123L, None),
'a_string': ('foo', None),
},
'r': {},
}])
m.replay()
class TestError(Exception): pass
def testTransaction(fail):
obj = TestModel.get_by_key_name("foo")
obj.an_integer = 123
obj.put()
if fail: raise TestError()
db.run_in_transaction(testTransaction, False)
try:
db.run_in_transaction(testTransaction, True)
except TestError:
pass
def testCreateIndexes(self):
m = self.mocker
mock_server = self.mock_server
mock_db = self.mock_db
# Expect the composite indexes design doc to be fetched
mock_db['_design/composite_indexes']
m.result({
'language': 'javascript',
'views': {},
'indexes': [],
'next_index': 1,
})
m.count(1, None)
# First two indexes
mock_db['_design/composite_indexes'] = mocker.ANY
m.count(2)
# Third request includes all 3 indexes
mock_db['_design/composite_indexes'] = {
'next_index': 4,
'indexes': [
{'id': 1, 'def': ('TestModel', False, 'a_string', 1, 'an_integer', 1)},
{'id': 2, 'def': ('TestModel', True, 'a_string', 1)},
{'id': 3, 'def': ('FullModel', False, 'a_list', 1, 'a_list', 1)}
],
'language': 'javascript',
'views': {
'1': { # Basic index on 2 properties
'map': "function(doc) {"
"key=Array();idx=Array();"
"if(doc.type == 'TestModel') {"
"for(idx[0] = 0; idx[0] < doc.a_string.length; idx[0] += 2) {"
"key.push(doc.a_string[idx[0]]);"
"for(idx[1] = 0; idx[1] < doc.an_integer.length; idx[1] += 2) {"
"key.push(doc.an_integer[idx[1]]);"
"emit(key, null);"
"key.pop();"
"}"
"key.pop();"
"}"
"}"
"}"
},
'2': { # Ancestor index on 1 property
'map': "function(doc) {"
"key=Array();idx=Array();"
"if(doc.type == 'TestModel') {"
"for(var aidx = 0; aidx < len(doc.path) - 2; aidx += 2) {"
"key.push(doc.path.slice(0, aidx));"
"for(idx[0] = 0; idx[0] < doc.a_string.length; idx[0] += 2) {"
"key.push(doc.a_string[idx[0]]);"
"emit(key, null);"
"key.pop();"
"}"
"key.pop();"
"}"
"}"
"}"
},
'3': { # Multiply-indexed single property
'map': "function(doc) {"
"key=Array();idx=Array();"
"if(doc.type == 'FullModel') {"
"for(idx[0] = 0; idx[0] < doc.a_list.length; idx[0] += 2) {"
"key.push(doc.a_list[idx[0]]);"
"for(idx[1] = idx[0] + 2; idx[1] < doc.a_list.length; idx[1] += 2) {"
"key.push(doc.a_list[idx[1]]);"
"emit(key, null);"
"key.pop();"
"}"
"key.pop();"
"}"
"}"
"}"
},
}
}
m.replay()
# Create a set of indexes specified in an index.yaml file
indexes = datastore_index.ParseIndexDefinitions("""
indexes:
# Basic (simple) index
- kind: TestModel
properties:
- name: a_string
- name: an_integer
# With ancestor
- kind: TestModel
ancestor: yes
properties:
- name: a_string
# Multiply indexed field
- kind: FullModel
properties:
- name: a_list
- name: a_list
""").indexes
requested = datastore_admin.IndexDefinitionsToProtos("test", indexes)
requested = dict((x.definition().Encode(), x) for x in requested)
for key, index in requested.iteritems():
datastore_admin.CreateIndex(index)
def testCompositeIndexQuery(self):
m = self.mocker
mock_server = self.mock_server
mock_db = self.mock_db
mock_db['_design/composite_indexes']
m.result({
'next_index': 4,
'indexes': [
{'id': 1, 'def': ('TestModel', False, 'a_string', 1, 'an_integer', 1)},
{'id': 2, 'def': ('TestModel', True, 'a_string', 1)},
{'id': 3, 'def': ('FullModel', False, 'a_list', 1, 'a_list', 1)}
],
'language': 'javascript',
'views': {}
})
mock_db.view('composite_indexes/1', count=20,
startkey=['String number 5', 5],
endkey=['String number 5', 5, {}], skip=0,
descending=False)
result = m.mock(couchdb.client.ViewResults)
m.result(result)
self.mockQueryResponse(mock_db, self.result_keys[4:5])
mock_db.view('composite_indexes/1', count=20,
startkey=['String number 5', 1, None],
endkey=['String number 5', 10], skip=0,
descending=False)
result = m.mock(couchdb.client.ViewResults)
m.result(result)
self.mockQueryResponse(mock_db, self.result_keys[4:5])
mock_db.view('composite_indexes/1', count=20,
startkey=['String number 2', None],
endkey=['String number 6'], skip=0,
descending=False)
result = m.mock(couchdb.client.ViewResults)
m.result(result)
self.mockQueryResponse(mock_db, self.result_keys[2:5])
m.replay()
# Equality filter on two properties
q = TestModel.all().filter("a_string =", "String number 5")
q.filter("an_integer =", 5)
tm = q.fetch(20)
self.failUnlessEqual(len(tm), 1)
self.failUnlessEqual(tm[0].an_integer, 5)
self.failUnlessEqual(tm[0].a_string, 'String number 5')
# Equality and range
q = TestModel.all().filter("a_string =", "String number 5")
q.filter("an_integer >", 1)
q.filter("an_integer <", 10)
tm = q.fetch(20)
self.failUnlessEqual(len(tm), 1)
self.failUnlessEqual(tm[0].an_integer, 5)
self.failUnlessEqual(tm[0].a_string, 'String number 5')
# Range and sort order
q = TestModel.all()
q.filter("a_string >", "String number 2")
q.filter("a_string <", "String number 6")
q.order("a_string")
q.order("an_integer")
tm = q.fetch(20)
self.failUnlessEqual(len(tm), 3)
self.failUnlessEqual(tm[0].an_integer, 3)
self.failUnlessEqual(tm[0].a_string, 'String number 3')
if __name__ == "__main__":
unittest.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment