Skip to content

Instantly share code, notes, and snippets.

@puentesarrin
Forked from cordis/access_mongo_randomly.py
Last active August 25, 2016 07:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save puentesarrin/858dbb8e4b864f81d56012ac94cd0a68 to your computer and use it in GitHub Desktop.
Save puentesarrin/858dbb8e4b864f81d56012ac94cd0a68 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import raven
CURRENT = os.path.dirname(__file__)
sys.path.insert(0, os.path.realpath(os.path.join(CURRENT, '..', '..')))
sys.path.insert(1, os.path.realpath(os.path.join(CURRENT, '..', '..', 'rebelmouse')))
os.environ['DJANGO_SETTINGS_MODULE'] = 'rebelmouse.settings'
if os.getenv('SENTRY_DSN'):
raven.Client(transport=raven.transport.http.HTTPTransport)
from rm.boot import boot
boot()
# Code follow below
from warnings import warn
from random import choice
from faker import Faker
from rm.rmmongo import RmMongo
COLLECTION = 'test_collection'
def main():
mongo = RmMongo.get_instance('mongo-test', read_from_master=False)
drop_collection(mongo)
doc_id_list = []
while True:
mongo = RmMongo.get_instance('mongo-test', read_from_master=False)
doc_id_list.append(insert_document(mongo))
doc_id = choice(doc_id_list)
if not doc_id:
continue
print 'Finding {} ...'.format(str(doc_id))
mongo = RmMongo.get_instance('mongo-test', read_from_master=False)
doc = find_document(mongo, doc_id)
print 'Found {}'.format(doc)
def drop_collection(mongo):
mongo[COLLECTION].drop()
def insert_document(mongo):
faker = Faker()
document = {'name': faker.name(), 'phone': faker.phone_number()}
return call_or_return_none(lambda: mongo[COLLECTION].insert_one(document).inserted_id)
def find_document(mongo, doc_id):
return call_or_return_none(lambda: mongo[COLLECTION].find_one({'_id': doc_id}))
def call_or_return_none(func):
try:
return func()
except Exception as e:
warn(str(e), RuntimeWarning)
return None
if __name__ == '__main__':
main()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import raven
CURRENT = os.path.dirname(__file__)
sys.path.insert(0, os.path.realpath(os.path.join(CURRENT, '..', '..')))
sys.path.insert(1, os.path.realpath(os.path.join(CURRENT, '..', '..', 'rebelmouse')))
os.environ['DJANGO_SETTINGS_MODULE'] = 'rebelmouse.settings'
if os.getenv('SENTRY_DSN'):
raven.Client(transport=raven.transport.http.HTTPTransport)
from rm.boot import boot
boot()
# Code follow below
from warnings import warn
from random import choice
import django.conf as conf
from faker import Faker
import mongoengine
COLLECTION = 'test_collection'
def main():
register_connection()
mongo = mongoengine.connection.get_db('mongo-test')
drop_collection(mongo)
doc_id_list = []
while True:
mongo = mongoengine.connection.get_db('mongo-test')
doc_id_list.append(insert_document(mongo))
doc_id = choice(doc_id_list)
if not doc_id:
continue
print 'Finding {} ...'.format(str(doc_id))
mongo = mongoengine.connection.get_db('mongo-test')
doc = find_document(mongo, doc_id)
print 'Found {}'.format(doc)
def register_connection():
mongo_settings = conf.settings.MONGO['mongo-test']
mongo_host = mongo_settings['host']
mongo_db_name = mongo_settings['db']
host = 'mongodb://{}/{}'.format(mongo_host, mongo_db_name)
mongoengine.register_connection(alias='mongo-test', host=host)
def drop_collection(mongo):
mongo[COLLECTION].drop()
def insert_document(mongo):
faker = Faker()
document = {'name': faker.name(), 'phone': faker.phone_number()}
return call_or_return_none(lambda: mongo[COLLECTION].insert_one(document).inserted_id)
def find_document(mongo, doc_id):
return call_or_return_none(lambda: mongo[COLLECTION].find_one({'_id': doc_id}))
def call_or_return_none(func):
try:
return func()
except Exception as e:
warn(str(e), RuntimeWarning)
return None
if __name__ == '__main__':
main()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import raven
CURRENT = os.path.dirname(__file__)
sys.path.insert(0, os.path.realpath(os.path.join(CURRENT, '..', '..')))
sys.path.insert(1, os.path.realpath(os.path.join(CURRENT, '..', '..', 'rebelmouse')))
os.environ['DJANGO_SETTINGS_MODULE'] = 'rebelmouse.settings'
if os.getenv('SENTRY_DSN'):
raven.Client(transport=raven.transport.http.HTTPTransport)
from rm.boot import boot
boot()
# Code follow below
from warnings import warn
from random import choice
import django.conf as conf
from faker import Faker
import mongoengine
import pymongo
COLLECTION = 'test_collection'
def main():
register_connection()
mongo = mongoengine.connection.get_db('mongo-test')
drop_collection(mongo)
doc_id_list = []
while True:
mongo = mongoengine.connection.get_db('mongo-test')
doc_id_list.append(insert_document(mongo))
doc_id = choice(doc_id_list)
if not doc_id:
continue
print 'Finding {} ...'.format(str(doc_id))
mongo = mongoengine.connection.get_db('mongo-test')
doc = find_document(mongo, doc_id)
print 'Found {}'.format(doc)
def register_connection():
mongo_settings = conf.settings.MONGO['mongo-test']
mongo_host = mongo_settings['replica_host']
mongo_db_name = mongo_settings['db']
host = 'mongodb://{}/{}'.format(mongo_host, mongo_db_name)
mongoengine.register_connection(alias='mongo-test', host=host, replicaSet=mongo_settings['replica_name'],
read_preference=pymongo.ReadPreference.SECONDARY_PREFERRED)
def drop_collection(mongo):
mongo[COLLECTION].drop()
def insert_document(mongo):
faker = Faker()
document = {'name': faker.name(), 'phone': faker.phone_number()}
return call_or_return_none(lambda: mongo[COLLECTION].insert_one(document).inserted_id)
def find_document(mongo, doc_id):
return call_or_return_none(lambda: mongo[COLLECTION].find_one({'_id': doc_id}))
def call_or_return_none(func):
try:
return func()
except Exception as e:
warn(str(e), RuntimeWarning)
return None
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment