Skip to content

Instantly share code, notes, and snippets.

@ogabrielguerra
Last active February 7, 2024 01:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ogabrielguerra/6d26a53ad7a92317ed23b31d11c15b53 to your computer and use it in GitHub Desktop.
Save ogabrielguerra/6d26a53ad7a92317ed23b31d11c15b53 to your computer and use it in GitHub Desktop.
Abstracting ObjectId handling and CRUD operations (MongoDB).
import json
from bson.objectid import ObjectId
from pymongo.cursor import Cursor as pymongo_cursor
from pymongo.collection import Collection as pymongo_collection
from pydantic.main import ModelMetaclass
class JSONEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, ObjectId):
return str(o)
return json.JSONEncoder.default(self, o)
class CoreModel:
def __init__(self, connection) -> None:
self.connection = connection
@staticmethod
def map_oid_to_id(response_model: ModelMetaclass, data: pymongo_collection) -> list:
response = []
for document in data:
e = JSONEncoder().encode(document)
d = json.loads(e)
response.append(response_model(**d))
return response
def get_data_by_query(self, collection_reference: str, query: object) -> pymongo_cursor:
mongo_collection: pymongo_collection = self.connection[collection_reference]
return mongo_collection.find(query)
#
# USAGE
#
from app.model.coreModel import CoreModel
from app.model.partnerModel import PartnerModel
from pymongo.cursor import Cursor as pymongo_cursor
class Partner:
def __init__(self, connection) -> None:
self.__CONNECTION = connection
self.__DEFAULT_COLLECTION = 'partner'
def index(self) -> list:
core_model: CoreModel = CoreModel(self.__CONNECTION)
query: object = {}
cursor: pymongo_cursor = core_model.get_data_by_query(self.__DEFAULT_COLLECTION, query)
return CoreModel.map_oid_to_id(PartnerModel, cursor)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment