Skip to content

Instantly share code, notes, and snippets.

@Weizhang2017
Last active December 4, 2019 06:30
Show Gist options
  • Save Weizhang2017/73db44a7c7e6bd86c7aed38516a581e7 to your computer and use it in GitHub Desktop.
Save Weizhang2017/73db44a7c7e6bd86c7aed38516a581e7 to your computer and use it in GitHub Desktop.
context manager in python
from pymongo import MongoClient
import time
class MongoConnect:
'''connect to mongodb with context manager'''
def __init__(self, host='127.0.0.1', port=27017, callbackOnExit=None, waitingTime=0):
self.host = host
self.port = port
self.con = None
self.callbackOnExit = callbackOnExit
self.waitingTime = waitingTime
def __enter__(self):
if self.con is not None:
raise RuntimeError('Already connected')
self.con = MongoClient(self.host, self.port)
return self.con
def __exit__(self, exc_ty, exc_val, tb):
self.con.close()
self.con = None
time.sleep(self.waitingTime)
if self.callbackOnExit:
self.callbackOnExit(f'{exc_ty}, {exc_val}, {tb}')
def test():
con = MongoConnect(callbackOnFailure=print, waitingTime=5)
with con as c:
doc = c['email_db']['messages_v2'].find_one()
print(doc)
if __name__ == '__main__':
test()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment