Last active
March 1, 2021 20:12
-
-
Save lelandbatey/1135c9ce89892140f63653648fd0031f to your computer and use it in GitHub Desktop.
MongoDB, Motor, Tornado web server, all in Python; for testing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# In order to test an upgrade from Motor 1.x to Motor 2.1, we want to have | |
# automated tests verifying our existing MongoDB scaffolding isn't using any of | |
# the deprecated/removed features of PyMongo/Motor. Since Python is dynamic, | |
# and Motor is ESPECIALLY dynamic, using simple static analysis (such as with | |
# Vulture) can't give us accurate results. So instead, we must write unit tests | |
# which exercise the methods of common_python.mongo.Client, and by exercising | |
# them, verify that the fields/methods of the underlying MotorClient are being | |
# called correctly. | |
# | |
# However, we also want our unit tests to be _truly_ idempotent AND to test | |
# against a real instance of MongoDB. To get both of these things, we can use | |
# the nifty pymongo_inmemory library. | |
# | |
# This file acts as a sketch/outline showing how (using only Python) we can | |
# have a test Tornado server make requests against a temporary MongoDB server | |
# of our choosing. | |
# | |
# Run this file, then run: | |
# $ curl localhost:8888/ | |
# [{"_id": 1, "name": "zap", "value": "pow"}] | |
from pprint import pprint | |
import json | |
import os | |
from pymongo import MongoClient | |
from motor.motor_tornado import MotorClient | |
from pymongo_inmemory import Mongod | |
import tornado.httpserver | |
import tornado.ioloop | |
import tornado.options | |
import tornado.web | |
from tornado.gen import coroutine | |
from tornado.options import define, options | |
define("port", default=8888, help="run on the given port", type=int) | |
class MainHandler(tornado.web.RequestHandler): | |
def initialize(self, motor_client): | |
self.motrcl = motor_client | |
self.db = motor_client.get_database('example') | |
@coroutine | |
def get(self): | |
col = self.db.get_collection('foo') | |
cursor = col.find({}) | |
documents = list() | |
while (yield cursor.fetch_next): | |
document = cursor.next_object() | |
documents.append(document) | |
self.write(json.dumps(documents)) | |
def main(): | |
os.environ['PYMONGOIM__MONGO_VERSION'] = '4.0.10' | |
with Mongod() as dbp: | |
# insert some temporary resources | |
mc = MongoClient(dbp.connection_string) | |
db = mc.get_database('example') | |
col = db.get_collection('foo') | |
res = col.replace_one({'_id': 1}, {'_id': 1, 'name': 'zap', 'value': 'pow'}, upsert=True) | |
print(res) | |
pprint(res) | |
print(dir(res)) | |
pprint(dir(res)) | |
motrcl = MotorClient(dbp.connection_string) | |
tornado.options.parse_command_line() | |
application = tornado.web.Application([(r"/", MainHandler, dict(motor_client=motrcl))]) | |
http_server = tornado.httpserver.HTTPServer(application) | |
http_server.listen(options.port) | |
tornado.ioloop.IOLoop.current().start() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment