| from datetime import datetime | |
| import arrow | |
| import jsonmodels.models | |
| import tinydb | |
| from six import add_metaclass | |
| from tinydb_serialization import SerializationMiddleware | |
| from tinydb_serialization import Serializer | |
| class DateTimeSerializer(Serializer): | |
| """DateTimeSerializer based on arrow""" | |
| OBJ_CLASS = datetime | |
| def encode(self, obj): | |
| return arrow.get(obj).isoformat() | |
| def decode(self, s): | |
| return arrow.get(s).datetime | |
| class DatabaseProxy(object): | |
| """ | |
| Proxy that is used during context binding. | |
| """ | |
| __slots__ = ['db'] | |
| def __init__(self): | |
| self.proxy(None) | |
| def proxy(self, db): | |
| self.db = db | |
| def __getattr__(self, attr): | |
| if self.db is None: | |
| raise AttributeError('Cannot use uninitialized DatabaseProxy.') | |
| return getattr(self.db, attr) | |
| def __setattr__(self, attr, value): | |
| if attr not in self.__slots__: | |
| raise AttributeError('Cannot set attribute on DatabaseProxy.') | |
| return super(DatabaseProxy, self).__setattr__(attr, value) | |
| class Database(object): | |
| """ | |
| Basic database that can be used globaly or per request. | |
| Example: | |
| database = Database('db.json') | |
| class Cat(TinyModel): | |
| name = fields.StringField(required=True) | |
| class Meta: | |
| database = database | |
| Cat(name='Chuck').save() | |
| """ | |
| def __init__(self, db_file): | |
| store_cls = tinydb.storages.MemoryStorage\ | |
| if ':memory:' == db_file\ | |
| else tinydb.storages.JSONStorage | |
| serializer = SerializationMiddleware(store_cls) | |
| serializer.register_serializer(DateTimeSerializer(), 'TinyDateTime') | |
| self.db = tinydb.TinyDB(storage=serializer)\ | |
| if ':memory:' == db_file\ | |
| else tinydb.TinyDB(db_file, storage=serializer) | |
| def __repr__(self): | |
| return '<{}: storage={}>'.format( | |
| self.__class__.__name__, | |
| self.db._storage.__class__.__name__, | |
| ) | |
| class ProxyDatabase(Database): | |
| """ | |
| Base proxy to be used where more control is needed over | |
| curenty active database. | |
| Example: | |
| class Cat(TinyModel): | |
| name = fields.StringField(required=True) | |
| class Meta: | |
| database = DatabaseProxy() | |
| with ProxyDatabase(':memory:', [Cat]): | |
| Cat(name='Chuck').save() | |
| """ | |
| def __init__(self, db_file, models): | |
| super(ProxyDatabase, self).__init__(db_file) | |
| self.models = models | |
| def __enter__(self): | |
| for model in self.models: | |
| # setup proxy binding | |
| model.Meta.database.proxy(self.db) | |
| return self.db | |
| def __exit__(self, *args): | |
| self.db.close() | |
| for model in self.models: | |
| # remove proxy binding | |
| model.Meta.database.proxy(None) | |
| class TablenameMeta(type): | |
| def __new__(cls, name, parents, attrs): | |
| attrs.setdefault('__tablename__', name.lower()) | |
| return type.__new__(cls, name, parents, attrs) | |
| @add_metaclass(TablenameMeta) | |
| class ModelWrapper(jsonmodels.models.Base): | |
| """Wraps tinydb with jsonmodels""" | |
| def __repr__(self): | |
| return '<{}: __tablename__={}>'.format( | |
| self.__class__.__name__, | |
| self.__tablename__, | |
| ) | |
| class TinyModel(ModelWrapper): | |
| """Base model for all inherited models""" | |
| def __init__(self, eid=None, *args, **kwargs): | |
| super(TinyModel, self).__init__(*args, **kwargs) | |
| self.eid = eid | |
| @property | |
| def table(self): | |
| """Returns tinydb.table instance for this model""" | |
| return self.Meta.database.table(self.__tablename__) | |
| @classmethod | |
| def get(cls, cond): | |
| """Returns model instance for provided cond""" | |
| table = cls.Meta.database.table(cls.__tablename__) | |
| res = table.get(cond) | |
| return cls(eid=res.eid, **res) | |
| @classmethod | |
| def search(cls, cond): | |
| """Returns model instances matching cond""" | |
| table = cls.Meta.database.table(cls.__tablename__) | |
| return [cls(eid=row.eid, **row) for row in table.search(cond)] | |
| @classmethod | |
| def all(cls): | |
| """Returns all model instances""" | |
| table = cls.Meta.database.table(cls.__tablename__) | |
| return [cls(eid=row.eid, **row) for row in table.all()] | |
| def save(self): | |
| """Saves or updates this model instance to database""" | |
| if not self.eid: | |
| return self.table.insert(self.to_struct()) | |
| return self.table.update(self.to_struct(), eids=[self.eid]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment