Skip to content

Instantly share code, notes, and snippets.

@diogommartins
Last active August 16, 2019 01:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save diogommartins/c2d60fa932e29cb80cd9f68cafd89bae to your computer and use it in GitHub Desktop.
Save diogommartins/c2d60fa932e29cb80cd9f68cafd89bae to your computer and use it in GitHub Desktop.
import abc
import asyncio
from typing import Dict, Union, List, Generator, Optional
from aiologger import Logger
from sieve_models.base import ModelMeta
from sqlalchemy import Table, update, Column, delete
from sqlalchemy.sql import ColumnCollection
from sqlalchemy.sql.expression import select, and_
class ABCApiService(metaclass=abc.ABCMeta):
db_model: ModelMeta = NotImplementedError()
def __init__(self, logger: Logger, engine_write, engine_read=None):
loop = asyncio.get_event_loop()
self.engine_write = engine_write
if engine_read is None:
engine_read = engine_write
logger.debug({'info': "engine_read wasn't provided. Using engine_write"})
self.engine_read = engine_read
@property
def table(self) -> Table:
return self.db_model.__table__
@property
def default_values(self) -> Generator:
for col in self.table._columns:
if col.default is None:
continue
value = col.default.arg
if callable(value):
value = value(None)
yield col.name, value
@property
def primary_key(self) -> Column:
table = self.db_model.__table__
pkeys: ColumnCollection = table.primary_key.columns
if len(pkeys) > 1:
raise NotImplementedError("Tables with composite primary keys "
"arent supported")
return pkeys.values()[0]
async def get(self, id: Union[int, str]) -> Dict:
"""returns the row with primary_key=id"""
async with self.engine_read.acquire() as conn:
cursor = await conn.execute(
select([self.db_model])
.where(self.primary_key == id)
)
row = await cursor.fetchone()
if row:
return dict(row)
async def filter(self, params: Dict) -> Optional[List[Dict]]:
"""list of rows that match given params"""
async with self.engine_read.acquire() as conn:
filters = ((getattr(self.db_model, k) == v) for k, v in params.items())
cur = await conn.execute(select([self.db_model])
.where(and_(*filters)))
if cur.rowcount:
return [dict(row) async for row in cur]
async def insert(self, params: Dict) -> Dict[str, int]:
async with self.engine_write.acquire() as conn:
values = dict(self.default_values)
values.update(params)
result = await conn.execute(
self.table.insert().values(values)
)
if result.lastrowid:
return {self.primary_key.name: result.lastrowid}
else:
return {self.primary_key.name: params[self.primary_key.name]}
async def update(self, id: Union[int, str], params: Dict) -> int:
"""
:param id: the primary key id of the row to be updated
:param params: A dict of column:value to be updated
:return: the updated rows count
"""
async with self.engine_write.acquire() as conn:
result = await conn.execute(
update(self.db_model)
.where(self.primary_key == id)
.values(params)
)
return result.rowcount
async def delete(self, id: Union[int, str]) -> int:
"""
:param id: the primary key id of the row to be deleted
:return: the deleted rows count
"""
async with self.engine_write.acquire() as conn:
result = await conn.execute(
delete(self.db_model)
.where(self.primary_key == id)
)
return result.rowcount
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment