Skip to content

Instantly share code, notes, and snippets.

@iKunalChhabra
Last active December 20, 2022 07:18
Show Gist options
  • Save iKunalChhabra/0abd3433f716657a4902e20670f754b7 to your computer and use it in GitHub Desktop.
Save iKunalChhabra/0abd3433f716657a4902e20670f754b7 to your computer and use it in GitHub Desktop.
Example class to connect to mongo and do various operations
import pymongo
import pandas as pd
class Mongo:
def __init__(self, db_name, host, port):
self.__db_name = db_name
self.__host = host
self.__port = port
self.__client = pymongo.MongoClient(self.__host, self.__port)
self.__db = self.__client[self.__db_name]
self.__collection = None
@property
def collection(self):
if self.__collection is None:
raise Exception("Collection not initialized")
return self.__collection
@collection.setter
def collection(self, collection_name):
self.__collection = self.__db[collection_name]
def insert_many(self, data):
print('\nInserting data')
print('Data: ', data)
self.collection.insert_many(data)
print('Inserted data')
def insert_one(self, data):
print('\nInserting data')
print('Data: ', data)
self.collection.insert_one(data)
print('Inserted data')
def upsert_one(self, pk, data):
print('\nUpserting data')
print('PK: ', pk)
print('Data: ', data)
self.collection.find_one_and_replace(filter={pk:data[pk]}, replacement=data, upsert=True)
print('Upserted data')
def upsert_many(self, pk, data):
print('\nUpserting data')
print('PK: ', pk)
print('Data: ', data)
for record in data:
self.collection.find_one_and_replace(filter={pk:record[pk]}, replacement=record, upsert=True)
print('Upserted data')
def find(self, filter={}, type='dict'):
print('\nFinding data')
print('filter: ', filter)
data_dict = self.collection.find(filter)
print('Found data')
if type == 'dict':
return list(data_dict)
elif type == 'DataFrame':
return pd.DataFrame(list(data_dict))
def count(self, filter={}):
print('\nCounting data')
print('filter: ', filter)
count = self.collection.count_documents(filter)
print('Counted data')
return count
def delete_many(self, filter):
print('\nDeleting data')
print('filter: ', filter)
self.collection.delete_many(filter)
print('Deleted data')
def delete_one(self, filter):
print('\nDeleting data')
print('filter: ', filter)
self.collection.delete_one(filter)
print('Deleted data')
if __name__ == '__main__':
mongo = Mongo(db_name='test', host='localhost', port=27017)
mongo.collection = 'myapp'
count = mongo.count(filter={'country': 'India'})
print('Count: ', count)
mongo.delete_many(filter={'country': 'India'})
mongo.upsert_one(pk='id', data={'name': 'Sam', 'age': 54, 'id': 44, 'country': 'India'})
mongo.upsert_one(pk='id', data={'name': 'Rahul', 'age': 44, 'country': 'India', 'id': 66})
mongo.upsert_one(pk='id', data={'name': 'Jim', 'age': 56, 'country': 'USA', 'id': 77})
mongo.upsert_one(pk='id', data={'name': 'Jim', 'age': 56, 'country': 'USA', 'id': 88})
mongo.upsert_one(pk='id', data={'name': 'Johny', 'age': 29, 'country': 'Canada', 'id': 101})
data = [
{'name': 'Sammy', 'age': 54, 'id': 44, 'country': 'India'},
{'name': 'Rahul', 'age': 44, 'country': 'India', 'id': 66},
{'name': 'Jim', 'age': 56, 'country': 'USA', 'id': 77},
{'name': 'Jim', 'age': 56, 'country': 'USA', 'id': 88},
{'name': 'Johny', 'age': 29, 'country': 'Canada', 'id': 101}
]
mongo.upsert_many(pk='id', data=data)
df = mongo.find(type='DataFrame')
print(df)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment