Skip to content

Instantly share code, notes, and snippets.

@ahancock1
Last active July 23, 2022 10:04
Show Gist options
  • Save ahancock1/72e459e294875988f4e6138d348122de to your computer and use it in GitHub Desktop.
Save ahancock1/72e459e294875988f4e6138d348122de to your computer and use it in GitHub Desktop.
python reactive rx sqlite nosql json document database with filter
MIT License
Copyright (c) 2022 Adam Hancock
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
from typing import Any, Dict, List, Protocol
from dataclasses import dataclass
from enum import Enum
from aiosqlite import Connection, Cursor, connect
from aiosqlite.context import contextmanager
from jsonpickle import encode, decode
import rx.operators as ops
from rx.subject import Subject
from rx.core import Observable
import re
def async_to_rx(source: Coroutine, loop = None) -> Observable:
if loop is None:
loop = asyncio.get_event_loop()
return (rx
.from_future(
loop.create_task(
source)
))
KEY_ERROR = ValueError("key is None")
ITEM_ERROR = ValueError("item is None")
ARGS_ERROR = ValueError("args is None")
Query = str | Dict[str, Any | List[Any]]
class IDataStore(Protocol):
def changes(self) -> Observable:
...
def created(self) -> Observable:
...
def updated(self) -> Observable:
...
def deleted(self) -> Observable:
...
def get(self, args: Query = None) -> Observable:
...
def all(self, args: Query = None) -> Observable:
...
def insert(self, key: str, item: Any) -> Observable:
...
def update(self, key: str, item: Any) -> Observable:
...
def delete(self, key: str) -> Observable:
...
def count(self, args: Query = None) -> Observable:
...
def exists(self, key: str) -> Observable:
...
def save(self, key: str, item: Any) -> Observable:
...
def clear(self) -> Observable:
...
class EventType(Enum):
CREATED = 1
UPDATED = 2
DELETED = 3
@dataclass
class DataEvent:
old_item: Any
new_item: Any
event_type: EventType
class DataStore:
def __init__(self,
file_path: str,
table_name: str) -> None:
super().__init__()
self._file = file_path
self._name = table_name
self._init = False
self._events = Subject()
def changes(self) -> Observable:
return self._events.pipe(
ops.as_observable())
def created(self) -> Observable:
return self._events.pipe(
ops.filter(
lambda x: x.event_type == EventType.CREATED))
def updated(self) -> Observable:
return self._events.pipe(
ops.filter(
lambda x: x.event_type == EventType.UPDATED))
def deleted(self) -> Observable:
return self._events.pipe(
ops.filter(
lambda x: x.event_type == EventType.DELETED))
def get(self, key: str) -> Observable:
if not key:
raise KEY_ERROR
async def _():
async with self._connect() as db:
items = await self._select(db, key)
if not items:
return None
return items[0]
return async_to_rx(_())
def all(self, args: Query = None) -> Observable:
async def _():
async with self._connect() as db:
items = await self._select(db, args)
if not items:
return []
return items
return async_to_rx(_())
def insert(self, key: str, item: Any) -> Observable:
if not key:
raise KEY_ERROR
if not item:
raise ITEM_ERROR
async def _():
async with self._connect() as db:
n = await self._count(db, key)
if n > 0:
raise ValueError(f"key {key} already exists")
await self._insert(db, key, item)
return async_to_rx(_())
def update(self, key: str, item: Any) -> Observable:
if not key:
raise KEY_ERROR
if not item:
raise ITEM_ERROR
async def _():
async with self._connect() as db:
n = await self._count(db, key)
if n == 0:
raise ValueError(f"key {key} not found")
await self._update(db, key, item)
return async_to_rx(_())
def delete(self, key: str) -> Observable:
if not key:
raise KEY_ERROR
async def _():
async with self._connect() as db:
await self._delete(db, key)
return async_to_rx(_())
def count(self, args: Query = None) -> Observable:
async def _():
async with self._connect() as db:
return await self._count(db, args)
return async_to_rx(_())
def exists(self, args: Query) -> Observable:
if not args:
raise ARGS_ERROR
async def _():
async with self._connect() as db:
return await self._count(db, args) > 0
return async_to_rx(_())
def save(self, key: str, item: Any) -> Observable:
if not key:
raise KEY_ERROR
if not item:
raise ITEM_ERROR
async def _():
async with self._connect() as db:
if await self._count(db, key) > 0:
await self._update(db, key, item)
else:
await self._insert(db, key, item)
return async_to_rx(_())
def clear(self) -> Observable:
async def _():
async with self._connect() as db:
items = await self._select(db)
if not items:
return
await self._clear(db)
for item in items:
self._deleted(item)
return async_to_rx(_())
@contextmanager
async def _connect(self) -> Connection:
db = await connect(self._file)
await self._ensure(db)
return db
@contextmanager
async def _execute(self,
db: Connection,
sql: str,
values: List[Any] = []) -> Cursor:
x = await db.execute(sql, values)
self._print(sql, values)
return x
async def _ensure(self, db: Connection) -> None:
if self._init:
return
sql = f"""
CREATE TABLE IF NOT EXISTS {self._name}
(
key TEXT PRIMARY KEY,
data TEXT NOT NULL
)
"""
await self._commit(db, sql)
self._init = True
async def _commit(self,
db: Connection,
sql: str,
values: List[Any] = []) -> None:
await self._execute(db, sql, values)
await db.commit()
async def _fetchone(self,
db: Connection,
sql: str,
values: List[Any] = []) -> Any:
async with self._execute(db, sql, values) as x:
item = await x.fetchone()
if item is None:
return
return item[0]
async def _fetchall(self,
db: Connection,
sql: str,
values: List[Any] = []) -> Observable:
async with self._execute(db, sql, values) as x:
items = await x.fetchall()
if not items:
return
return [item[0] for item in items]
def _where(self, args: Query) -> str:
match args:
case str():
return f"WHERE key = ?"
case dict():
clause = [f"""
JSON_EXTRACT(data, '$.{k}')
IN ({
', '.join('?' * len(v))
if isinstance(v, list | tuple) else
'?'
})
""" for k, v in args.items()]
return "WHERE " + " AND ".join(clause)
case _:
return ""
def _values(self, args: Query) -> List[Any]:
match args:
case str():
return [args]
case dict():
return [
x for v in args.values()
for x in
(v if isinstance(v, list | tuple) else [v])
]
case _:
return []
async def _select(self,
db: Connection,
args: Query = None) -> List[Any]:
sql = f"""
SELECT data
FROM {self._name}
{self._where(args)}
"""
values = self._values(args)
items = await self._fetchall(db, sql, values)
return [decode(item) for item in items]
async def _insert(self,
db: Connection,
key: str,
item: Any) -> None:
sql = f"""
INSERT INTO {self._name} ( key, data )
VALUES ( ?, ? )
"""
values = [key, encode(item)]
await self._commit(db, sql, values)
self._created(item)
async def _update(self,
db: Connection,
key: str,
item: Any) -> None:
new_item = item
old_item = await self._select(db, key)
if not old_item:
return
old_item = old_item[0]
if new_item == old_item:
return
sql = f"""
UPDATE {self._name}
SET data = ?
WHERE key = ?
"""
values = [encode(item), key]
await self._commit(db, sql, values)
self._updated(old_item, new_item)
async def _count(self,
db: Connection,
args: Query) -> int:
sql = f"""
SELECT COUNT(*)
FROM {self._name}
{self._where(args)}
"""
values = self._values(args)
return await self._fetchone(
db, sql, values)
async def _delete(self, db: Connection, key: str) -> None:
item = await self._select(db, key)
if not item:
return
sql = f"""
DELETE FROM {self._name}
WHERE key = ?
"""
await self._commit(db, sql, [key])
self._deleted(item)
async def _clear(self, db: Connection) -> None:
sql = f"""
DELETE FROM {self._name}
"""
await self._commit(db, sql)
def _created(self, item: Any) -> None:
self._events.on_next(
DataEvent(None, item, EventType.CREATED))
def _updated(self, old_item: Any, new_item: Any) -> None:
self._events.on_next(
DataEvent(old_item, new_item, EventType.UPDATED))
def _deleted(self, item: Any) -> None:
self._events.on_next(
DataEvent(item, None, EventType.DELETED))
def _print(self, sql: str, args: List[Any] | Any = None) -> None:
sql_msg = re.sub(" +", " ", sql.replace("\n", "")).strip()
if args is not None:
if not isinstance(args, list):
args = [args]
sql_msg = sql_msg.replace("?", "%s")
args = [
f"'{x}'" if isinstance(x, str) else str(x)
for x in args
]
sql_msg = sql_msg % tuple(args)
print(sql_msg)
class Database:
_tables: Dict[str, DataStore]
def __init__(self, file_path: str):
assert file_path
self._file = file_path
self._tables = {}
def __getitem__(self, name: str) -> DataStore:
if name not in self._tables:
self._tables[name] = DataStore(
self._file, name
)
return self._tables[name]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment