Created
September 8, 2011 12:34
-
-
Save jbenet/1203286 to your computer and use it in GitHub Desktop.
datastore api
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Datastore(object): | |
'''A Datastore represents storage for serialized dronestore versions. | |
Datastores are general enough to be backed by all kinds of different storage: | |
in-memory caches, databases, a remote cache, flat files on disk, etc. | |
The general idea is to wrap a more complicated storage facility in a simple, | |
uniform interface, keeping the freedom of using the right tools for the job. | |
In particular, a Datastore can aggregate other datastores in interesting ways, | |
like sharded (to distribute load) or tiered access (caches before databases). | |
While Datastores should be written general enough to accept all sorts of | |
values, some implementations will undoubtedly have to be specific (e.g. SQL | |
databases where fields should be decomposed into columns), particularly those | |
that support Queries. | |
This interface matches the Drone's well, as it supports each of its calls. | |
''' | |
def get(self, key): | |
'''Return the object named by key.''' | |
raise NotImplementedError | |
def put(self, key, value): | |
'''Stores the object.''' | |
raise NotImplementedError | |
def delete(self, key): | |
'''Removes the object.''' | |
raise NotImplementedError | |
def contains(self, key): | |
'''Returns whether the object is in this datastore.''' | |
raise NotImplementedError | |
def query(self, query): | |
'''Returns a sequence of objects matching criteria expressed in `query`''' | |
raise NotImplementedError | |
class DictDatastore(Datastore): | |
'''Simple straw-man in-memory datastore backed by a dict. | |
WARNING: it does not evict entries so it will grow indefinitely. use this for | |
testing, short-lived, or small working-set programs. | |
''' | |
def __init__(self): | |
self._items = {} | |
def get(self, key): | |
'''Return the object named by key.''' | |
try: | |
return self._items[key] | |
except KeyError, e: | |
return None | |
def put(self, key, value): | |
'''Stores the object.''' | |
if value is None: | |
self.delete(key) | |
else: | |
self._items[key] = value | |
def delete(self, key): | |
'''Removes the object.''' | |
try: | |
del self._items[key] | |
except KeyError, e: | |
pass | |
def contains(self, key): | |
'''Returns whether the object is in this datastore.''' | |
return key in self._items | |
def query(self, query): | |
'''Returns a sequence of objects matching criteria expressed in `query`''' | |
# entire dataset already in memory, so ok to apply query naively | |
return query(self._items.values()) | |
def __len__(self): | |
return len(self._items) | |
class DatastoreCollection(Datastore): | |
'''Represents a collection of datastores.''' | |
def __init__(self, stores=[]): | |
'''Initialize the datastore with any provided datastores.''' | |
if not isinstance(stores, list): | |
stores = list(stores) | |
for store in stores: | |
if not isinstance(store, Datastore): | |
raise TypeError("all stores must be of type %s" % Datastore) | |
self._stores = stores | |
def datastore(self, index): | |
return self._stores[index] | |
def appendDatastore(self, store): | |
if not isinstance(store, Datastore): | |
raise TypeError("stores must be of type %s" % Datastore) | |
self._stores.append(store) | |
def removeDatastore(self, store): | |
self._stores.remove(store) | |
def insertDatastore(self, index, store): | |
if not isinstance(store, Datastore): | |
raise TypeError("stores must be of type %s" % Datastore) | |
self._stores.insert(index, store) | |
class TieredDatastore(DatastoreCollection): | |
'''Represents a hierarchical collection of datastores. | |
Each datastore is queried in order. This is helpful to organize access | |
in terms of speed (i.e. hit caches first). | |
''' | |
def get(self, key): | |
'''Return the object named by key.''' | |
value = None | |
for store in self._stores: | |
value = store.get(key) | |
if value is not None: | |
break | |
# add model to lower stores only | |
if value is not None: | |
for store2 in self._stores: | |
if store == store2: | |
break | |
store2.put(key, value) | |
return value | |
def put(self, key, value): | |
'''Stores the object in all stores.''' | |
for store in self._stores: | |
store.put(key, value) | |
def delete(self, key): | |
'''Removes the object from all stores.''' | |
for store in self._stores: | |
store.delete(key) | |
def contains(self, key): | |
'''Returns whether the object is in this datastore.''' | |
for store in self._stores: | |
if store.contains(key): | |
return True | |
return False | |
def query(self, query): | |
'''Returns a sequence of objects matching criteria expressed in `query`''' | |
# queries hit the last (most complete) datastore | |
return self._stores[-1].query(query) | |
class ShardedDatastore(DatastoreCollection): | |
'''Represents a collection of datastore shards. | |
A datastore is selected based on a sharding function. | |
sharding functions should take a Key and return an integer. | |
WARNING: adding or removing datastores while running may severely affect | |
consistency. Also ensure the order is correct upon initialization. | |
While this is not as important for caches, it is crucial for | |
persistent atastore. | |
''' | |
def __init__(self, stores=[], shardingfn=hash): | |
'''Initialize the datastore with any provided datastore.''' | |
if not callable(shardingfn): | |
raise TypeError('shardingfn (type %s) is not callable' % type(shardingfn)) | |
super(ShardedDatastore, self).__init__(stores) | |
self._shardingfn = shardingfn | |
def shard(self, key): | |
return self._shardingfn(key) % len(self._stores) | |
def shardDatastore(self, key): | |
return self.datastore(self.shard(key)) | |
def get(self, key): | |
'''Return the object named by key from the corresponding datastore.''' | |
return self.shardDatastore(key).get(key) | |
def put(self, key, value): | |
'''Stores the object to the corresponding datastore.''' | |
self.shardDatastore(key).put(key, value) | |
def delete(self, key): | |
'''Removes the object from the corresponding datastore.''' | |
self.shardDatastore(key).delete(key) | |
def contains(self, key): | |
'''Returns whether the object is in this datastore.''' | |
return self.shardDatastore(key).contains(key) | |
def query(self, query): | |
'''Returns a sequence of objects matching criteria expressed in `query`''' | |
items = [] | |
results = [s.query(query) for s in self._stores] | |
map(items.extend, results) | |
items = sorted(items, cmp=query.orderFn) | |
return items[:query.limit] | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import basic | |
import pylru | |
class LRUCache(basic.Datastore): | |
'''Represents an LRU cache datastore. backed by pylru.''' | |
def __init__(self, size): | |
self._cache = pylru.lrucache(size) | |
def __len__(self): | |
return len(self._cache) | |
def clear(self): | |
self._cache.clear() | |
def get(self, key): | |
'''Return the object named by key.''' | |
try: | |
return self._cache[key] | |
except KeyError, e: | |
return None | |
def put(self, key, value): | |
'''Stores the object.''' | |
self._cache[key] = value | |
def delete(self, key): | |
'''Removes the object.''' | |
if key in self._cache: | |
del self._cache[key] | |
def contains(self, key): | |
'''Returns whether the object is in this datastore.''' | |
return key in self._cache | |
def query(self, query): | |
'''Returns a sequence of objects matching criteria expressed in `query`''' | |
# entire dataset already in memory, so ok to apply query naively | |
return query(self._cache.values()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import basic | |
import pymongo | |
import bson | |
__version__ = '1' | |
kKEY = 'key' | |
kVAL = 'val' | |
kMONGOID = '_id' | |
kWRAPPED = 'dswrapped' | |
class MongoDatastore(basic.Datastore): | |
'''Represents a Mongo database as a datastore.''' | |
def __init__(self, mongoDatabase): | |
self.database = mongoDatabase | |
self._indexed = {} | |
def _collectionForType(self, type): | |
'''Returns the `collection` corresponding to `type`.''' | |
# place objects in collections based on the keyType | |
collection = self.database[type] | |
# ensure there is an index, at least once per run. | |
if type not in self._indexed: | |
collection.create_index(kKEY, unique=True) | |
self._indexed[type] = True | |
return collection | |
def _collection(self, key): | |
'''Returns the `collection` corresponding to `key`.''' | |
return self._collectionForType(key.type()) | |
@staticmethod | |
def _wrap(key, value): | |
'''Returns a value to insert. Non-documents are wrapped in a document.''' | |
if not isinstance(value, dict) or kKEY not in value or value[kKEY] != key: | |
return { kKEY:key, kVAL:value, kWRAPPED:True} | |
if kMONGOID in value: | |
del value[kMONGOID] | |
return value | |
@staticmethod | |
def _unwrap(value): | |
'''Returns a value to return. Wrapped-documents are unwrapped.''' | |
if value is not None and kWRAPPED in value and value[kWRAPPED]: | |
return value[kVAL] | |
if isinstance(value, dict) and kMONGOID in value: | |
del value[kMONGOID] | |
return value | |
def get(self, key): | |
'''Return the object named by key.''' | |
# query the corresponding mongodb collection for this key | |
value = self._collection(key).find_one( { kKEY:str(key) } ) | |
return self._unwrap(value) | |
def put(self, key, value): | |
'''Stores the object.''' | |
sKey = str(key) | |
value = self._wrap(sKey, value) | |
# update (or insert) the relevant document matching key | |
self._collection(key).update( { kKEY:sKey }, value, upsert=True, safe=True) | |
def delete(self, key): | |
'''Removes the object.''' | |
self._collection(key).remove( { kKEY:str(key) } ) | |
def contains(self, key): | |
'''Returns whether the object is in this datastore.''' | |
return self._collection(key).find( { kKEY:str(key) } ).count() > 0 | |
def query(self, query): | |
'''Returns a sequence of objects matching criteria expressed in `query`''' | |
coll = self._collectionForType(query.type) | |
return QueryTranslate.collectionQuery(coll, query) | |
class UnwrapperCursor(object): | |
'''An iterator object to wrap around the mongodb cursor. | |
Ensures objects fetched by queries are clear of any wrapping | |
''' | |
def __init__(self, cursor): | |
self.cursor = cursor | |
def __iter__(self): | |
return self | |
def next(self): | |
return MongoDatastore._unwrap(self.cursor.next()) | |
class QueryTranslate(object): | |
'''Translates queries from dronestore queries to mongodb queries.''' | |
COND_OPS = { '>':'$gt', '>=':'$gte', '!=':'$ne', '<=':'$lte', '<':'$lt' } | |
VERSION_FIELDS = \ | |
['key', 'hash', 'parent', 'created', 'committed', 'attributes', 'type'] | |
@classmethod | |
def collectionQuery(self, collection, query): | |
cursor = collection.find(self.filters(query.filters)) | |
if len(query.orders) > 0: | |
cursor.sort(self.orders(query.orders)) | |
if query.offset > 0: | |
cursor.skip(query.offset) | |
cursor.limit(query.limit) | |
return UnwrapperCursor(cursor) | |
@classmethod | |
def field(cls, field): | |
if field in cls.VERSION_FIELDS: | |
return field | |
return 'attributes.%s.value' % field | |
@classmethod | |
def filter(cls, filter): | |
if filter.op == '=': | |
return filter.value | |
return { cls.COND_OPS[filter.op] : filter.value } | |
@classmethod | |
def filters(cls, filters): | |
keys = [cls.field(f.field) for f in filters] | |
vals = [cls.filter(f) for f in filters] | |
return dict(zip(keys, vals)) | |
@classmethod | |
def orders(cls, orders): | |
keys = [cls.field(o.field) for o in orders] | |
vals = [1 if o.isAscending() else -1 for o in orders] | |
return dict(zip(keys, vals)) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment