Skip to content

Instantly share code, notes, and snippets.

@jbenet
Created September 8, 2011 12:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jbenet/1203286 to your computer and use it in GitHub Desktop.
Save jbenet/1203286 to your computer and use it in GitHub Desktop.
datastore api
class Datastore(object):
'''A Datastore represents storage for serialized dronestore versions.
Datastores are general enough to be backed by all kinds of different storage:
in-memory caches, databases, a remote cache, flat files on disk, etc.
The general idea is to wrap a more complicated storage facility in a simple,
uniform interface, keeping the freedom of using the right tools for the job.
In particular, a Datastore can aggregate other datastores in interesting ways,
like sharded (to distribute load) or tiered access (caches before databases).
While Datastores should be written general enough to accept all sorts of
values, some implementations will undoubtedly have to be specific (e.g. SQL
databases where fields should be decomposed into columns), particularly those
that support Queries.
This interface matches the Drone's well, as it supports each of its calls.
'''
def get(self, key):
'''Return the object named by key.'''
raise NotImplementedError
def put(self, key, value):
'''Stores the object.'''
raise NotImplementedError
def delete(self, key):
'''Removes the object.'''
raise NotImplementedError
def contains(self, key):
'''Returns whether the object is in this datastore.'''
raise NotImplementedError
def query(self, query):
'''Returns a sequence of objects matching criteria expressed in `query`'''
raise NotImplementedError
class DictDatastore(Datastore):
'''Simple straw-man in-memory datastore backed by a dict.
WARNING: it does not evict entries so it will grow indefinitely. use this for
testing, short-lived, or small working-set programs.
'''
def __init__(self):
self._items = {}
def get(self, key):
'''Return the object named by key.'''
try:
return self._items[key]
except KeyError, e:
return None
def put(self, key, value):
'''Stores the object.'''
if value is None:
self.delete(key)
else:
self._items[key] = value
def delete(self, key):
'''Removes the object.'''
try:
del self._items[key]
except KeyError, e:
pass
def contains(self, key):
'''Returns whether the object is in this datastore.'''
return key in self._items
def query(self, query):
'''Returns a sequence of objects matching criteria expressed in `query`'''
# entire dataset already in memory, so ok to apply query naively
return query(self._items.values())
def __len__(self):
return len(self._items)
class DatastoreCollection(Datastore):
'''Represents a collection of datastores.'''
def __init__(self, stores=[]):
'''Initialize the datastore with any provided datastores.'''
if not isinstance(stores, list):
stores = list(stores)
for store in stores:
if not isinstance(store, Datastore):
raise TypeError("all stores must be of type %s" % Datastore)
self._stores = stores
def datastore(self, index):
return self._stores[index]
def appendDatastore(self, store):
if not isinstance(store, Datastore):
raise TypeError("stores must be of type %s" % Datastore)
self._stores.append(store)
def removeDatastore(self, store):
self._stores.remove(store)
def insertDatastore(self, index, store):
if not isinstance(store, Datastore):
raise TypeError("stores must be of type %s" % Datastore)
self._stores.insert(index, store)
class TieredDatastore(DatastoreCollection):
'''Represents a hierarchical collection of datastores.
Each datastore is queried in order. This is helpful to organize access
in terms of speed (i.e. hit caches first).
'''
def get(self, key):
'''Return the object named by key.'''
value = None
for store in self._stores:
value = store.get(key)
if value is not None:
break
# add model to lower stores only
if value is not None:
for store2 in self._stores:
if store == store2:
break
store2.put(key, value)
return value
def put(self, key, value):
'''Stores the object in all stores.'''
for store in self._stores:
store.put(key, value)
def delete(self, key):
'''Removes the object from all stores.'''
for store in self._stores:
store.delete(key)
def contains(self, key):
'''Returns whether the object is in this datastore.'''
for store in self._stores:
if store.contains(key):
return True
return False
def query(self, query):
'''Returns a sequence of objects matching criteria expressed in `query`'''
# queries hit the last (most complete) datastore
return self._stores[-1].query(query)
class ShardedDatastore(DatastoreCollection):
'''Represents a collection of datastore shards.
A datastore is selected based on a sharding function.
sharding functions should take a Key and return an integer.
WARNING: adding or removing datastores while running may severely affect
consistency. Also ensure the order is correct upon initialization.
While this is not as important for caches, it is crucial for
persistent atastore.
'''
def __init__(self, stores=[], shardingfn=hash):
'''Initialize the datastore with any provided datastore.'''
if not callable(shardingfn):
raise TypeError('shardingfn (type %s) is not callable' % type(shardingfn))
super(ShardedDatastore, self).__init__(stores)
self._shardingfn = shardingfn
def shard(self, key):
return self._shardingfn(key) % len(self._stores)
def shardDatastore(self, key):
return self.datastore(self.shard(key))
def get(self, key):
'''Return the object named by key from the corresponding datastore.'''
return self.shardDatastore(key).get(key)
def put(self, key, value):
'''Stores the object to the corresponding datastore.'''
self.shardDatastore(key).put(key, value)
def delete(self, key):
'''Removes the object from the corresponding datastore.'''
self.shardDatastore(key).delete(key)
def contains(self, key):
'''Returns whether the object is in this datastore.'''
return self.shardDatastore(key).contains(key)
def query(self, query):
'''Returns a sequence of objects matching criteria expressed in `query`'''
items = []
results = [s.query(query) for s in self._stores]
map(items.extend, results)
items = sorted(items, cmp=query.orderFn)
return items[:query.limit]
import basic
import pylru
class LRUCache(basic.Datastore):
'''Represents an LRU cache datastore. backed by pylru.'''
def __init__(self, size):
self._cache = pylru.lrucache(size)
def __len__(self):
return len(self._cache)
def clear(self):
self._cache.clear()
def get(self, key):
'''Return the object named by key.'''
try:
return self._cache[key]
except KeyError, e:
return None
def put(self, key, value):
'''Stores the object.'''
self._cache[key] = value
def delete(self, key):
'''Removes the object.'''
if key in self._cache:
del self._cache[key]
def contains(self, key):
'''Returns whether the object is in this datastore.'''
return key in self._cache
def query(self, query):
'''Returns a sequence of objects matching criteria expressed in `query`'''
# entire dataset already in memory, so ok to apply query naively
return query(self._cache.values())
import basic
import pymongo
import bson
__version__ = '1'
kKEY = 'key'
kVAL = 'val'
kMONGOID = '_id'
kWRAPPED = 'dswrapped'
class MongoDatastore(basic.Datastore):
'''Represents a Mongo database as a datastore.'''
def __init__(self, mongoDatabase):
self.database = mongoDatabase
self._indexed = {}
def _collectionForType(self, type):
'''Returns the `collection` corresponding to `type`.'''
# place objects in collections based on the keyType
collection = self.database[type]
# ensure there is an index, at least once per run.
if type not in self._indexed:
collection.create_index(kKEY, unique=True)
self._indexed[type] = True
return collection
def _collection(self, key):
'''Returns the `collection` corresponding to `key`.'''
return self._collectionForType(key.type())
@staticmethod
def _wrap(key, value):
'''Returns a value to insert. Non-documents are wrapped in a document.'''
if not isinstance(value, dict) or kKEY not in value or value[kKEY] != key:
return { kKEY:key, kVAL:value, kWRAPPED:True}
if kMONGOID in value:
del value[kMONGOID]
return value
@staticmethod
def _unwrap(value):
'''Returns a value to return. Wrapped-documents are unwrapped.'''
if value is not None and kWRAPPED in value and value[kWRAPPED]:
return value[kVAL]
if isinstance(value, dict) and kMONGOID in value:
del value[kMONGOID]
return value
def get(self, key):
'''Return the object named by key.'''
# query the corresponding mongodb collection for this key
value = self._collection(key).find_one( { kKEY:str(key) } )
return self._unwrap(value)
def put(self, key, value):
'''Stores the object.'''
sKey = str(key)
value = self._wrap(sKey, value)
# update (or insert) the relevant document matching key
self._collection(key).update( { kKEY:sKey }, value, upsert=True, safe=True)
def delete(self, key):
'''Removes the object.'''
self._collection(key).remove( { kKEY:str(key) } )
def contains(self, key):
'''Returns whether the object is in this datastore.'''
return self._collection(key).find( { kKEY:str(key) } ).count() > 0
def query(self, query):
'''Returns a sequence of objects matching criteria expressed in `query`'''
coll = self._collectionForType(query.type)
return QueryTranslate.collectionQuery(coll, query)
class UnwrapperCursor(object):
'''An iterator object to wrap around the mongodb cursor.
Ensures objects fetched by queries are clear of any wrapping
'''
def __init__(self, cursor):
self.cursor = cursor
def __iter__(self):
return self
def next(self):
return MongoDatastore._unwrap(self.cursor.next())
class QueryTranslate(object):
'''Translates queries from dronestore queries to mongodb queries.'''
COND_OPS = { '>':'$gt', '>=':'$gte', '!=':'$ne', '<=':'$lte', '<':'$lt' }
VERSION_FIELDS = \
['key', 'hash', 'parent', 'created', 'committed', 'attributes', 'type']
@classmethod
def collectionQuery(self, collection, query):
cursor = collection.find(self.filters(query.filters))
if len(query.orders) > 0:
cursor.sort(self.orders(query.orders))
if query.offset > 0:
cursor.skip(query.offset)
cursor.limit(query.limit)
return UnwrapperCursor(cursor)
@classmethod
def field(cls, field):
if field in cls.VERSION_FIELDS:
return field
return 'attributes.%s.value' % field
@classmethod
def filter(cls, filter):
if filter.op == '=':
return filter.value
return { cls.COND_OPS[filter.op] : filter.value }
@classmethod
def filters(cls, filters):
keys = [cls.field(f.field) for f in filters]
vals = [cls.filter(f) for f in filters]
return dict(zip(keys, vals))
@classmethod
def orders(cls, orders):
keys = [cls.field(o.field) for o in orders]
vals = [1 if o.isAscending() else -1 for o in orders]
return dict(zip(keys, vals))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment