Last active
August 16, 2019 01:38
-
-
Save diogommartins/c2d60fa932e29cb80cd9f68cafd89bae to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import abc | |
import asyncio | |
from typing import Dict, Union, List, Generator, Optional | |
from aiologger import Logger | |
from sieve_models.base import ModelMeta | |
from sqlalchemy import Table, update, Column, delete | |
from sqlalchemy.sql import ColumnCollection | |
from sqlalchemy.sql.expression import select, and_ | |
class ABCApiService(metaclass=abc.ABCMeta): | |
db_model: ModelMeta = NotImplementedError() | |
def __init__(self, logger: Logger, engine_write, engine_read=None): | |
loop = asyncio.get_event_loop() | |
self.engine_write = engine_write | |
if engine_read is None: | |
engine_read = engine_write | |
logger.debug({'info': "engine_read wasn't provided. Using engine_write"}) | |
self.engine_read = engine_read | |
@property | |
def table(self) -> Table: | |
return self.db_model.__table__ | |
@property | |
def default_values(self) -> Generator: | |
for col in self.table._columns: | |
if col.default is None: | |
continue | |
value = col.default.arg | |
if callable(value): | |
value = value(None) | |
yield col.name, value | |
@property | |
def primary_key(self) -> Column: | |
table = self.db_model.__table__ | |
pkeys: ColumnCollection = table.primary_key.columns | |
if len(pkeys) > 1: | |
raise NotImplementedError("Tables with composite primary keys " | |
"arent supported") | |
return pkeys.values()[0] | |
async def get(self, id: Union[int, str]) -> Dict: | |
"""returns the row with primary_key=id""" | |
async with self.engine_read.acquire() as conn: | |
cursor = await conn.execute( | |
select([self.db_model]) | |
.where(self.primary_key == id) | |
) | |
row = await cursor.fetchone() | |
if row: | |
return dict(row) | |
async def filter(self, params: Dict) -> Optional[List[Dict]]: | |
"""list of rows that match given params""" | |
async with self.engine_read.acquire() as conn: | |
filters = ((getattr(self.db_model, k) == v) for k, v in params.items()) | |
cur = await conn.execute(select([self.db_model]) | |
.where(and_(*filters))) | |
if cur.rowcount: | |
return [dict(row) async for row in cur] | |
async def insert(self, params: Dict) -> Dict[str, int]: | |
async with self.engine_write.acquire() as conn: | |
values = dict(self.default_values) | |
values.update(params) | |
result = await conn.execute( | |
self.table.insert().values(values) | |
) | |
if result.lastrowid: | |
return {self.primary_key.name: result.lastrowid} | |
else: | |
return {self.primary_key.name: params[self.primary_key.name]} | |
async def update(self, id: Union[int, str], params: Dict) -> int: | |
""" | |
:param id: the primary key id of the row to be updated | |
:param params: A dict of column:value to be updated | |
:return: the updated rows count | |
""" | |
async with self.engine_write.acquire() as conn: | |
result = await conn.execute( | |
update(self.db_model) | |
.where(self.primary_key == id) | |
.values(params) | |
) | |
return result.rowcount | |
async def delete(self, id: Union[int, str]) -> int: | |
""" | |
:param id: the primary key id of the row to be deleted | |
:return: the deleted rows count | |
""" | |
async with self.engine_write.acquire() as conn: | |
result = await conn.execute( | |
delete(self.db_model) | |
.where(self.primary_key == id) | |
) | |
return result.rowcount |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment