Created
February 9, 2025 09:51
-
-
Save erincar/e22cdb32502f38d1b23e3108d448498e to your computer and use it in GitHub Desktop.
Old web doodles
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from functools import reduce | |
from ordered_set import OrderedSet # TODO: add to requirements | |
from django.utils.translation import gettext as _ | |
# TODO: analyze inbound serializers for use cases | |
# TODO: analyze outbound serializers for use cases | |
# TODO: add case variables to exception messages | |
# define an exception with its check method, s.t. | |
# @Error.check(parameters) decorator checks for errors in a method, | |
# raises the Error if necessary | |
class CheckingError(Exception): | |
def check(self, methods=None, error_message=None): # TODO: accept decorated method/class | |
# TODO: if methods are defined, treat decorated object as class, treat as method otherwise | |
# TODO: decorate all methods (or object itself) with | |
# TODO: return the decorator | |
pass | |
class SerializerErrors: | |
class ModelMismatchError(Exception): pass | |
# TODO: handle custom fields of serializers | |
# TODO: handle conflicts between custom fields of different purpose (currently, latter overrides) | |
# TODO: minimize occurence for this case | |
# TODO: handle partial orderings (e.g. S1->a,c | S2->b, (S1, S2) -> a,b,c) | |
# TODO: allow for custom ordering or set a standard ordering (e.g., given order) | |
# Solution: Create a FieldOrdering class to override all previous order, to be extended last | |
# Solution 2: Field ordering class decorator that decorates serializer init with Meta.fields alteration | |
# (not sure if init will be enough) | |
def CombinedModelSerializer(final_model, *serializer_classes): | |
""" | |
Creates and returns combined model serializer with given serializer classes. | |
Verifies that all serializers are model serializer for the final_model. | |
Ensures the ordering of fields corresponds to the ordering of classes. | |
""" | |
# Verify that all serializers are model serializer for the final_model | |
if not all(c.Meta.model is final_model for c in serializer_classes): | |
raise SerializerErrors.ModelMismatchError( | |
_("All Meta models of the provided serializer classes should be the same."), | |
) | |
# Ensure the ordering of fields corresponds to the ordering of classes | |
final_fields = reduce( | |
lambda a, b: OrderedSet(a) | OrderedSet(b), | |
map(lambda c: c.Meta.fields, serializer_classes), | |
) | |
class MergedSerializer(serializers.ModelSerializer): | |
class Meta: | |
model = final_model | |
fields = (*final_fields,) | |
MergedSerializer._declared_fields = reduce( | |
lambda a, b: dict(**a, **b), | |
map(lambda c: c._declared_fields, serializer_classes), | |
) | |
return MergedSerializer | |
# TODO: change into initializer-decorated class as decoration | |
def FieldOrdering(field_order): | |
class FieldOrdering(serializers.ModelSerializer): | |
def __init__(self, *args, **kwargs): | |
if (not field_order) or set(field_order) != set(self.Meta.fields): | |
raise SerializerErrors.UnequalFieldSetsError( | |
_("Fields provided in ordering are not the same as fields of this serializers."), | |
) | |
# Remove initializer from method | |
super().__init__(*args, **kwargs) | |
# Uncomment after TODO | |
# @SerializerErrors.UnequalFieldSetsError.check(field_order, | |
# methods=['__init__'], | |
# error_message=_("Fields provided in ordering are not the same as fields of this serializers."),) | |
# class FieldOrdering(serializers.ModelSerializer): pass | |
return FieldOrdering |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
PostgreSQL Handler | |
2018-10-23 | |
======= | |
Connects to the PostgreSQL DB specified. | |
Handles Select, Insert, Update, Delete operations. | |
""" | |
import os | |
import psycopg2 | |
class Variable: | |
QueryAmount = 0 | |
class Decorator: | |
@staticmethod | |
def HandleDatabaseException(operation): | |
""" | |
Wraps DB operations with exception handling. | |
""" | |
def wrapped(handler, *args, **kwargs): | |
try: | |
return operation(handler, *args, **kwargs) | |
except Exception as error: | |
print(error) | |
handler.connection.rollback() | |
return None | |
return wrapped | |
@staticmethod | |
def EnsureConnection(operation): | |
""" | |
Ensures that the DB connection is established. | |
Reconnects if disconnected. | |
""" | |
def wrapped(handler, *args, **kwargs): | |
while True: | |
try: | |
handler.cursor.execute("SELECT 1") | |
break | |
except psycopg2.OperationalError: | |
handler.Connect() | |
return operation(handler, *args, **kwargs) | |
return wrapped | |
@staticmethod | |
def CountQuery(operation): | |
def wrapped(handler, *args, **kwargs): | |
result = operation(handler, *args, **kwargs) | |
Variable.QueryAmount += 1 | |
return result | |
return wrapped | |
@staticmethod | |
def CommitTransaction(operation, commitPeriod=1, commitMessage=None): | |
""" | |
""" | |
def wrapped(handler, *args, **kwargs): | |
result = operation(handler, *args, **kwargs) | |
if Variable.QueryAmount == commitPeriod: | |
if commitMessage: print(commitMessage) | |
handler.connection.commit() | |
Variable.QueryAmount = 0 | |
return result | |
return wrapped | |
class Type: | |
class Expression: | |
def __init__(self, expressionString): | |
self.s = expressionString | |
class Configuration: | |
@staticmethod | |
def Environment(): | |
return { | |
'host': os.getenv('PGHOST', '127.0.0.1'), | |
'port': os.getenv('PGPORT', 5432), | |
'dbname': os.getenv('POSTGRES_DB', 'postgres'), | |
'user': os.getenv('POSTGRES_USER', 'postgres'), | |
'password': os.getenv('POSTGRES_PASSWORD', ''), | |
} | |
class Handler: | |
def __init__(self, configs): | |
""" | |
""" | |
self.connection, self.cursor = None, None | |
self.host = configs['host'] | |
self.port = configs['port'] | |
self.dbname = configs['dbname'] | |
self.user = configs['user'] | |
self.password = configs['password'] | |
self.Connect() | |
@Decorator.HandleDatabaseException | |
def Connect(self): | |
self.connection = psycopg2.connect( | |
host=self.host, | |
port=self.port, | |
dbname=self.dbname, | |
user=self.user, | |
password=self.password, | |
sslmode='allow' | |
) | |
self.cursor = self.connection.cursor() | |
@Decorator.EnsureConnection | |
@Decorator.HandleDatabaseException | |
def Select(self, tableName, fields="*", conditions=None, order=None, limit=None, offset=None, debug=False, raw=False): | |
""" | |
fields -> ['field1', 'field2', 'field3'] | |
conditions -> [['c1', 'c2'], ['c3', 'c4']] => (c1 AND c2) OR (c3 AND c4) | |
""" | |
fieldNames = fields | |
if fields == "*": | |
self.cursor.execute( | |
f"""SELECT column_name """ + | |
f"""FROM information_schema.columns """ + | |
f"""WHERE table_name='{tableName}'""" | |
) | |
fieldNames = [f[0] for f in self.cursor.fetchall()] | |
query = f"""SELECT {', '.join(fields)} """ + \ | |
f"""FROM {tableName} """ + \ | |
f"""{'' if conditions in [None, []] else ' WHERE '+' OR '.join([f"({' AND '.join(c)})" for c in conditions])} """ + \ | |
(f"""ORDER BY {order} """ if order else "") + \ | |
(f"""LIMIT {limit} """ if limit else "") + \ | |
(f"""OFFSET {offset} """ if offset else "") | |
if debug: print(query) | |
if raw: return query | |
self.cursor.execute(query) | |
return [{field:value for field, value in zip(fieldNames, row)} for row in self.cursor.fetchall()] | |
@Decorator.CommitTransaction | |
@Decorator.CountQuery | |
@Decorator.EnsureConnection | |
@Decorator.HandleDatabaseException | |
def Insert(self, tableName, fieldValueDict, debug=False, raw=False): | |
""" | |
fields -> ['field1', 'field2', 'field3'] | |
values -> ['value1', 'value2', 'value3'] | |
""" | |
transform = lambda value: f"'{value}'" if type(value) == str else str(value) | |
fields, values = fieldValueDict.keys(), fieldValueDict.values() | |
if fields == []: return True | |
query = f"""INSERT INTO {tableName} """ + \ | |
f"""({', '.join(fields)}) """ + \ | |
f"""VALUES ({', '.join(map(transform, values))}) """ | |
if debug: print(query) | |
if raw: return query | |
self.cursor.execute(query) | |
return True | |
def TransformValue(self, value): | |
if type(value) == str: | |
return f"'{value}'" | |
if type(value) == Type.Expression: | |
return value.s | |
else: | |
return str(value) | |
@Decorator.CommitTransaction | |
@Decorator.CountQuery | |
@Decorator.EnsureConnection | |
@Decorator.HandleDatabaseException | |
def Update(self, tableName, fieldValueDict, conditions=None, debug=False, raw=False): | |
""" | |
fields -> ['field1', 'field2', 'field3'] | |
values -> ['value1', 'value2', 'value3'] | |
conditions -> [['c1', 'c2'], ['c3', 'c4']] => (c1 AND c2) OR (c3 AND c4) | |
""" | |
fields, values = fieldValueDict.keys(), fieldValueDict.values() | |
if fields == []: return True | |
query = f"""UPDATE {tableName} """ + \ | |
f"""SET {', '.join('='.join(pair) for pair in zip(fields, map(self.TransformValue, values)))} """ + \ | |
f"""{'' if conditions is None else ' WHERE '+' OR '.join([f"({' AND '.join(c)})" for c in conditions])}""" | |
if debug: print(query) | |
if raw: return query | |
self.cursor.execute(query) | |
return True | |
@Decorator.CommitTransaction | |
@Decorator.CountQuery | |
@Decorator.EnsureConnection | |
@Decorator.HandleDatabaseException | |
def Delete(self, tableName, conditions=None, debug=False, raw=False): | |
""" | |
conditions -> [['c1', 'c2'], ['c3', 'c4']] => (c1 AND c2) OR (c3 AND c4) | |
""" | |
query = f"""DELETE FROM {tableName} """ + \ | |
f"""{'' if conditions is None else ' WHERE '+' OR '.join([f"({' AND '.join(c)})" for c in conditions])}""" | |
if debug: print(query) | |
if raw: return query | |
self.cursor.execute(query) | |
return True | |
if __name__=="__main__": | |
ph = Handler(Configuration.Environment()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment