Skip to content

Instantly share code, notes, and snippets.

@erincar
Created February 9, 2025 09:51
Show Gist options
  • Save erincar/e22cdb32502f38d1b23e3108d448498e to your computer and use it in GitHub Desktop.
Save erincar/e22cdb32502f38d1b23e3108d448498e to your computer and use it in GitHub Desktop.
Old web doodles
from functools import reduce
from ordered_set import OrderedSet # TODO: add to requirements
from django.utils.translation import gettext as _
# TODO: analyze inbound serializers for use cases
# TODO: analyze outbound serializers for use cases
# TODO: add case variables to exception messages
# define an exception with its check method, s.t.
# @Error.check(parameters) decorator checks for errors in a method,
# raises the Error if necessary
class CheckingError(Exception):
def check(self, methods=None, error_message=None): # TODO: accept decorated method/class
# TODO: if methods are defined, treat decorated object as class, treat as method otherwise
# TODO: decorate all methods (or object itself) with
# TODO: return the decorator
pass
class SerializerErrors:
class ModelMismatchError(Exception): pass
# TODO: handle custom fields of serializers
# TODO: handle conflicts between custom fields of different purpose (currently, latter overrides)
# TODO: minimize occurence for this case
# TODO: handle partial orderings (e.g. S1->a,c | S2->b, (S1, S2) -> a,b,c)
# TODO: allow for custom ordering or set a standard ordering (e.g., given order)
# Solution: Create a FieldOrdering class to override all previous order, to be extended last
# Solution 2: Field ordering class decorator that decorates serializer init with Meta.fields alteration
# (not sure if init will be enough)
def CombinedModelSerializer(final_model, *serializer_classes):
"""
Creates and returns combined model serializer with given serializer classes.
Verifies that all serializers are model serializer for the final_model.
Ensures the ordering of fields corresponds to the ordering of classes.
"""
# Verify that all serializers are model serializer for the final_model
if not all(c.Meta.model is final_model for c in serializer_classes):
raise SerializerErrors.ModelMismatchError(
_("All Meta models of the provided serializer classes should be the same."),
)
# Ensure the ordering of fields corresponds to the ordering of classes
final_fields = reduce(
lambda a, b: OrderedSet(a) | OrderedSet(b),
map(lambda c: c.Meta.fields, serializer_classes),
)
class MergedSerializer(serializers.ModelSerializer):
class Meta:
model = final_model
fields = (*final_fields,)
MergedSerializer._declared_fields = reduce(
lambda a, b: dict(**a, **b),
map(lambda c: c._declared_fields, serializer_classes),
)
return MergedSerializer
# TODO: change into initializer-decorated class as decoration
def FieldOrdering(field_order):
class FieldOrdering(serializers.ModelSerializer):
def __init__(self, *args, **kwargs):
if (not field_order) or set(field_order) != set(self.Meta.fields):
raise SerializerErrors.UnequalFieldSetsError(
_("Fields provided in ordering are not the same as fields of this serializers."),
)
# Remove initializer from method
super().__init__(*args, **kwargs)
# Uncomment after TODO
# @SerializerErrors.UnequalFieldSetsError.check(field_order,
# methods=['__init__'],
# error_message=_("Fields provided in ordering are not the same as fields of this serializers."),)
# class FieldOrdering(serializers.ModelSerializer): pass
return FieldOrdering
"""
PostgreSQL Handler
2018-10-23
=======
Connects to the PostgreSQL DB specified.
Handles Select, Insert, Update, Delete operations.
"""
import os
import psycopg2
class Variable:
QueryAmount = 0
class Decorator:
@staticmethod
def HandleDatabaseException(operation):
"""
Wraps DB operations with exception handling.
"""
def wrapped(handler, *args, **kwargs):
try:
return operation(handler, *args, **kwargs)
except Exception as error:
print(error)
handler.connection.rollback()
return None
return wrapped
@staticmethod
def EnsureConnection(operation):
"""
Ensures that the DB connection is established.
Reconnects if disconnected.
"""
def wrapped(handler, *args, **kwargs):
while True:
try:
handler.cursor.execute("SELECT 1")
break
except psycopg2.OperationalError:
handler.Connect()
return operation(handler, *args, **kwargs)
return wrapped
@staticmethod
def CountQuery(operation):
def wrapped(handler, *args, **kwargs):
result = operation(handler, *args, **kwargs)
Variable.QueryAmount += 1
return result
return wrapped
@staticmethod
def CommitTransaction(operation, commitPeriod=1, commitMessage=None):
"""
"""
def wrapped(handler, *args, **kwargs):
result = operation(handler, *args, **kwargs)
if Variable.QueryAmount == commitPeriod:
if commitMessage: print(commitMessage)
handler.connection.commit()
Variable.QueryAmount = 0
return result
return wrapped
class Type:
class Expression:
def __init__(self, expressionString):
self.s = expressionString
class Configuration:
@staticmethod
def Environment():
return {
'host': os.getenv('PGHOST', '127.0.0.1'),
'port': os.getenv('PGPORT', 5432),
'dbname': os.getenv('POSTGRES_DB', 'postgres'),
'user': os.getenv('POSTGRES_USER', 'postgres'),
'password': os.getenv('POSTGRES_PASSWORD', ''),
}
class Handler:
def __init__(self, configs):
"""
"""
self.connection, self.cursor = None, None
self.host = configs['host']
self.port = configs['port']
self.dbname = configs['dbname']
self.user = configs['user']
self.password = configs['password']
self.Connect()
@Decorator.HandleDatabaseException
def Connect(self):
self.connection = psycopg2.connect(
host=self.host,
port=self.port,
dbname=self.dbname,
user=self.user,
password=self.password,
sslmode='allow'
)
self.cursor = self.connection.cursor()
@Decorator.EnsureConnection
@Decorator.HandleDatabaseException
def Select(self, tableName, fields="*", conditions=None, order=None, limit=None, offset=None, debug=False, raw=False):
"""
fields -> ['field1', 'field2', 'field3']
conditions -> [['c1', 'c2'], ['c3', 'c4']] => (c1 AND c2) OR (c3 AND c4)
"""
fieldNames = fields
if fields == "*":
self.cursor.execute(
f"""SELECT column_name """ +
f"""FROM information_schema.columns """ +
f"""WHERE table_name='{tableName}'"""
)
fieldNames = [f[0] for f in self.cursor.fetchall()]
query = f"""SELECT {', '.join(fields)} """ + \
f"""FROM {tableName} """ + \
f"""{'' if conditions in [None, []] else ' WHERE '+' OR '.join([f"({' AND '.join(c)})" for c in conditions])} """ + \
(f"""ORDER BY {order} """ if order else "") + \
(f"""LIMIT {limit} """ if limit else "") + \
(f"""OFFSET {offset} """ if offset else "")
if debug: print(query)
if raw: return query
self.cursor.execute(query)
return [{field:value for field, value in zip(fieldNames, row)} for row in self.cursor.fetchall()]
@Decorator.CommitTransaction
@Decorator.CountQuery
@Decorator.EnsureConnection
@Decorator.HandleDatabaseException
def Insert(self, tableName, fieldValueDict, debug=False, raw=False):
"""
fields -> ['field1', 'field2', 'field3']
values -> ['value1', 'value2', 'value3']
"""
transform = lambda value: f"'{value}'" if type(value) == str else str(value)
fields, values = fieldValueDict.keys(), fieldValueDict.values()
if fields == []: return True
query = f"""INSERT INTO {tableName} """ + \
f"""({', '.join(fields)}) """ + \
f"""VALUES ({', '.join(map(transform, values))}) """
if debug: print(query)
if raw: return query
self.cursor.execute(query)
return True
def TransformValue(self, value):
if type(value) == str:
return f"'{value}'"
if type(value) == Type.Expression:
return value.s
else:
return str(value)
@Decorator.CommitTransaction
@Decorator.CountQuery
@Decorator.EnsureConnection
@Decorator.HandleDatabaseException
def Update(self, tableName, fieldValueDict, conditions=None, debug=False, raw=False):
"""
fields -> ['field1', 'field2', 'field3']
values -> ['value1', 'value2', 'value3']
conditions -> [['c1', 'c2'], ['c3', 'c4']] => (c1 AND c2) OR (c3 AND c4)
"""
fields, values = fieldValueDict.keys(), fieldValueDict.values()
if fields == []: return True
query = f"""UPDATE {tableName} """ + \
f"""SET {', '.join('='.join(pair) for pair in zip(fields, map(self.TransformValue, values)))} """ + \
f"""{'' if conditions is None else ' WHERE '+' OR '.join([f"({' AND '.join(c)})" for c in conditions])}"""
if debug: print(query)
if raw: return query
self.cursor.execute(query)
return True
@Decorator.CommitTransaction
@Decorator.CountQuery
@Decorator.EnsureConnection
@Decorator.HandleDatabaseException
def Delete(self, tableName, conditions=None, debug=False, raw=False):
"""
conditions -> [['c1', 'c2'], ['c3', 'c4']] => (c1 AND c2) OR (c3 AND c4)
"""
query = f"""DELETE FROM {tableName} """ + \
f"""{'' if conditions is None else ' WHERE '+' OR '.join([f"({' AND '.join(c)})" for c in conditions])}"""
if debug: print(query)
if raw: return query
self.cursor.execute(query)
return True
if __name__=="__main__":
ph = Handler(Configuration.Environment())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment