Created
January 18, 2024 07:02
-
-
Save KokoseiJ/2819284c0e801f158f38cbb48c64e944 to your computer and use it in GitHub Desktop.
Simple Asynchronous MongoDB ORM on Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
import asyncio | |
from pymongo import MongoClient | |
from pymongo.collection import Collection as MongoCollection | |
from pymongo.results import InsertOneResult, UpdateResult, DeleteResult | |
from typing import Awaitable, Self | |
mongo: MongoClient = None | |
DATABASE_NAME = "kokomemo" | |
class DictObject: | |
KEYS: tuple[str] = () | |
REQUIRED: tuple[str] = () | |
def __init__(self, *args, required_check=False, **kwargs): | |
EMPTY = object() | |
if len(args) > len(self.KEYS): | |
ValueError("Excessive arguments " | |
f"({len(args)} > {len(self.KEYS)})") | |
for i, key in enumerate(self.KEYS): | |
value = kwargs.get(key, EMPTY) | |
if value is EMPTY: | |
if i < len(args): | |
value = args[i] | |
elif key in self.REQUIRED and required_check: | |
raise ValueError(f"Required key {key} missing") | |
elif i < len(args): | |
raise RuntimeWarning( | |
f"Arg `{key}`(==`{args[i]}`) overwritten " | |
f"by kwargs `{value}`" | |
) | |
setattr(self, key, value) | |
def to_dict(self) -> dict: | |
EMPTY = object() | |
data = dict() | |
for key in self.KEYS: | |
value = getattr(self, key, EMPTY) | |
if value is EMPTY: | |
if key in self.REQUIRED: | |
data[key] = None | |
else: | |
data[key] = value | |
return data | |
class MongoSchema(DictObject): | |
COLLECTION_NAME: str = "" | |
def __init__(self, *args, **kwargs): | |
if "_id" not in self.KEYS: | |
self.KEYS += ("_id") | |
super().__init__(*args, **kwargs) | |
@property | |
def collection(self) -> Awaitable[MongoCollection]: | |
return self._get_collection() | |
@property | |
def collection_name(self) -> str: | |
return self._get_collection_name() | |
@classmethod | |
async def find_one(cls, *args, **kwargs) -> Self | None: | |
loop = asyncio.get_running_loop() | |
collection = await cls._get_collection() | |
query = await loop.run_in_executor( | |
None, lambda: collection.find_one(*args, **kwargs) | |
) | |
if query: | |
return cls(**query) | |
else: | |
return None | |
@classmethod | |
async def find(cls, *args, **kwargs) -> tuple[Self]: | |
loop = asyncio.get_running_loop() | |
collection = await cls._get_collection() | |
queries = await loop.run_in_executor( | |
None, lambda: collection.find(*args, **kwargs) | |
) | |
return (cls(**query) for query in queries) | |
async def commit(self) -> InsertOneResult | UpdateResult: | |
loop = asyncio.get_running_loop() | |
collection = await self.collection | |
if self._id: | |
return await loop.run_in_executor( | |
None, lambda: collection.replace_one( | |
{"_id": self._id}, self.to_dict(), upsert=True | |
) | |
) | |
else: | |
data = await loop.run_in_executor( | |
None, collection.insert_one, (self.to_dict(),) | |
) | |
self._id = data.inserted_id | |
return data | |
async def delete(self) -> DeleteResult: | |
loop = asyncio.get_running_loop() | |
collection = await self.collection | |
if self._id: | |
return await loop.run_in_executor( | |
collection.delete_one, ({"_id": self._id},) | |
) | |
else: | |
raise RuntimeError("_id is not defined!") | |
@classmethod | |
def _get_collection_name(cls) -> str: | |
if cls.COLLECTION_NAME: | |
return cls.COLLECTION_NAME | |
else: | |
return re.sub(r"\s+", "_", cls.__name__.lower()) | |
@classmethod | |
async def _get_collection(cls) -> MongoCollection: | |
loop = asyncio.get_running_loop() | |
db = await loop.run_in_executor(None, mongo.get(DATABASE_NAME)) | |
collection = await loop.run_in_executor( | |
None, db.get(cls._get_collection_name()) | |
) | |
return collection |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment