Last active
March 27, 2020 00:51
-
-
Save lecafard/0cfeaa55b5faa7f7b7faa6c4f0b52412 to your computer and use it in GitHub Desktop.
python odm
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from lib.db import conn | |
import psycopg2.extras | |
from datetime import datetime, date | |
import logging | |
from typing import List, Tuple | |
logger = logging.getLogger(__name__) | |
class Model(object): | |
_table = '' | |
_columns = {} | |
_default_exclude = [] | |
_conn = conn | |
def __init__(self, args: dict): | |
for i in self.__class__._columns: | |
setattr(self, i, args.get(i, self.__class__._columns[i])) | |
def toJSON(self, exclude=None): | |
out = {} | |
if exclude == None: exclude = self.__class__._default_exclude | |
for i in self.__class__._columns: | |
if i in exclude: continue | |
out[i] = getattr(self, i) | |
if isinstance(out[i], datetime): | |
out[i] = int(out[i].timestamp()) | |
elif isinstance(out[i], date): | |
out[i] = out[i].strftime("%Y-%m-%d") | |
return out | |
def _one_to_many(self, cls2, col, col2, offset=0, limit=50, exclude=None): | |
if exclude == None: exclude = cls._default_exclude | |
query = (f'SELECT {cls2._get_columns_sql(exclude=exclude)} FROM {cls2._table} '+ | |
'WHERE {col2}=%s LIMIT %s OFFSET %s') | |
with cls._conn, cls._conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: | |
cur.execute(query, (getattr(self, col, self.__class__._columns[col]), limit, offset)) | |
return [cls(**i) for i in cur.fetchall()] | |
@classmethod | |
def _get_columns_sql(cls, exclude=None): | |
# since its only a small number of columns, we can do a linear | |
# search through the array | |
if exclude == None: exclude = cls._default_exclude | |
return ','.join(i for i in cls._columns.keys() if i not in exclude) | |
@classmethod | |
def _query_many(cls, query, params=tuple()): | |
with cls._conn, cls._conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: | |
logger.debug(f'SQL Query: {query}') | |
cur.execute(query, params) | |
return [cls(**i) for i in cur.fetchall()] | |
@classmethod | |
def _query_one(cls, query, params=tuple()): | |
with cls._conn, cls._conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: | |
logger.debug(f'SQL Query: {query}') | |
cur.execute(query, params) | |
res = cur.fetchone() | |
if not res: return None | |
return cls(**res) | |
@classmethod | |
def _insert(cls, data: dict, returning: List=[]): | |
query = f'INSERT INTO {cls._table} ({",".join(data.keys())}) VALUES ({("%s,"*len(data.keys()))[:-1]})' | |
if returning: | |
query += f' RETURNING {",".join(returning)}' | |
obj = cls._query_one(query, tuple(data.values())) | |
for i in data: | |
setattr(obj, i, data[i]) | |
return obj | |
@classmethod | |
def by_id(cls, _id, exclude=None): | |
if exclude == None: exclude = cls._default_exclude | |
query = f'SELECT {cls._get_columns_sql(exclude=exclude)} FROM {cls._table} WHERE _id=%s' | |
return cls._query_one(query, (_id,)) | |
@classmethod | |
def get_all(cls, offset=0, limit=50, where:List[Tuple[str, str, any]] = [], exclude=None): | |
if exclude == None: exclude = cls._default_exclude | |
query = f'SELECT {cls._get_columns_sql(exclude=exclude)} FROM {cls._table}' | |
if where: | |
query += f' WHERE {",".join(i[0] + i[1] + "%s" for i in where)}' | |
query += ' LIMIT %s OFFSET %s' | |
return cls._query_many(query, tuple(i[2] for i in where) + (limit, offset)) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment