Skip to content

Instantly share code, notes, and snippets.

@vytas7
Last active March 22, 2021 07:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vytas7/7e0a9d24a56a503f90e63d75c14d00fd to your computer and use it in GitHub Desktop.
Save vytas7/7e0a9d24a56a503f90e63d75c14d00fd to your computer and use it in GitHub Desktop.
Testing a simple Falcon+MongoDB document API
# # docker run --rm -it -p 27017:27017 mongo
# $ gunicorn --workers 8 --worker-class=meinheld.gmeinheld.MeinheldWorker test:app
import itertools
import random
import falcon
import pymongo
def take(n, iterable):
"Return first n items of the iterable as a list"
return list(itertools.islice(iterable, n))
class Documents:
def __init__(self):
self._client = pymongo.MongoClient()
self._documents = self._client.database.documents
def on_get(self, req, resp):
count = self._documents.count_documents({})
next_link = '/not-implemented' if count > 100 else None
documents = take(100, self._documents.find())
for doc in documents:
doc['id'] = str(doc.pop('_id'))
resp.media = {
'documents': documents,
'next': next_link,
'total': count,
}
def on_get_document(self, req, resp, document_id):
self._documents.find_one({'_id': document_id})
def on_post(self, req, resp):
ref = self._documents.insert_one(req.get_media())
resp.location = f'/documents/{ref.inserted_id}'
resp.status = falcon.HTTP_CREATED
def on_post_random(self, req, resp):
# TODO: DRY with on_post.
number = random.randint(100_000_000, 999_999_999)
ref = self._documents.insert_one({'random': number})
resp.location = f'/documents/{ref.inserted_id}'
resp.status = falcon.HTTP_CREATED
# Don't do this...
# This is just because I was too lazy to script `wrk` to send POSTs.
on_get_random = on_post_random
class MissingFunctionality:
def on_get(self, req, resp):
resp.content_type = falcon.MEDIA_TEXT
resp.status = falcon.HTTP_784
resp.text = 'Patches welcome!'
documents = Documents()
app = falcon.App()
app.add_route('/documents', documents)
app.add_route('/documents/random', documents, suffix='random')
app.add_route('/documents/{document_id}', documents, suffix='document')
app.add_route('/not-implemented', MissingFunctionality())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment