Skip to content

Instantly share code, notes, and snippets.

@KokoseiJ
Created January 18, 2024 07:02
Show Gist options
  • Save KokoseiJ/2819284c0e801f158f38cbb48c64e944 to your computer and use it in GitHub Desktop.
Save KokoseiJ/2819284c0e801f158f38cbb48c64e944 to your computer and use it in GitHub Desktop.
Simple Asynchronous MongoDB ORM on Python
import re
import asyncio
from pymongo import MongoClient
from pymongo.collection import Collection as MongoCollection
from pymongo.results import InsertOneResult, UpdateResult, DeleteResult
from typing import Awaitable, Self
mongo: MongoClient = None
DATABASE_NAME = "kokomemo"
class DictObject:
KEYS: tuple[str] = ()
REQUIRED: tuple[str] = ()
def __init__(self, *args, required_check=False, **kwargs):
EMPTY = object()
if len(args) > len(self.KEYS):
ValueError("Excessive arguments "
f"({len(args)} > {len(self.KEYS)})")
for i, key in enumerate(self.KEYS):
value = kwargs.get(key, EMPTY)
if value is EMPTY:
if i < len(args):
value = args[i]
elif key in self.REQUIRED and required_check:
raise ValueError(f"Required key {key} missing")
elif i < len(args):
raise RuntimeWarning(
f"Arg `{key}`(==`{args[i]}`) overwritten "
f"by kwargs `{value}`"
)
setattr(self, key, value)
def to_dict(self) -> dict:
EMPTY = object()
data = dict()
for key in self.KEYS:
value = getattr(self, key, EMPTY)
if value is EMPTY:
if key in self.REQUIRED:
data[key] = None
else:
data[key] = value
return data
class MongoSchema(DictObject):
COLLECTION_NAME: str = ""
def __init__(self, *args, **kwargs):
if "_id" not in self.KEYS:
self.KEYS += ("_id")
super().__init__(*args, **kwargs)
@property
def collection(self) -> Awaitable[MongoCollection]:
return self._get_collection()
@property
def collection_name(self) -> str:
return self._get_collection_name()
@classmethod
async def find_one(cls, *args, **kwargs) -> Self | None:
loop = asyncio.get_running_loop()
collection = await cls._get_collection()
query = await loop.run_in_executor(
None, lambda: collection.find_one(*args, **kwargs)
)
if query:
return cls(**query)
else:
return None
@classmethod
async def find(cls, *args, **kwargs) -> tuple[Self]:
loop = asyncio.get_running_loop()
collection = await cls._get_collection()
queries = await loop.run_in_executor(
None, lambda: collection.find(*args, **kwargs)
)
return (cls(**query) for query in queries)
async def commit(self) -> InsertOneResult | UpdateResult:
loop = asyncio.get_running_loop()
collection = await self.collection
if self._id:
return await loop.run_in_executor(
None, lambda: collection.replace_one(
{"_id": self._id}, self.to_dict(), upsert=True
)
)
else:
data = await loop.run_in_executor(
None, collection.insert_one, (self.to_dict(),)
)
self._id = data.inserted_id
return data
async def delete(self) -> DeleteResult:
loop = asyncio.get_running_loop()
collection = await self.collection
if self._id:
return await loop.run_in_executor(
collection.delete_one, ({"_id": self._id},)
)
else:
raise RuntimeError("_id is not defined!")
@classmethod
def _get_collection_name(cls) -> str:
if cls.COLLECTION_NAME:
return cls.COLLECTION_NAME
else:
return re.sub(r"\s+", "_", cls.__name__.lower())
@classmethod
async def _get_collection(cls) -> MongoCollection:
loop = asyncio.get_running_loop()
db = await loop.run_in_executor(None, mongo.get(DATABASE_NAME))
collection = await loop.run_in_executor(
None, db.get(cls._get_collection_name())
)
return collection
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment