Skip to content

Instantly share code, notes, and snippets.

@ytjohn
Created November 21, 2022 16:04
Show Gist options
  • Save ytjohn/e090e32e49027b50d0a881953dd719da to your computer and use it in GitHub Desktop.
Save ytjohn/e090e32e49027b50d0a881953dd719da to your computer and use it in GitHub Desktop.
#!/opt/stackstorm/st2/bin/python
import pymongo
import mongoengine
from pymongo.errors import OperationFailure
from pymongo.errors import ConnectionFailure
from pymongo.errors import ServerSelectionTimeoutError
import logging
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
from oslo_config import cfg
# import st2common.config as common_config
from st2api import config
config.register_opts(ignore_errors=True)
import st2common.models.db as db
db.LOG.setLevel(logging.DEBUG)
db_host = "mongo"
db_port = 27017
db_name = "st2"
username = None
password = None
connection_timeout = 3000 # ms
ssl_kwargs = {}
compressor_kwargs = {}
print('running _db_connect')
connection = db._db_connect(db_name, db_host, db_port)
# NOTE: We intentionally set "serverSelectionTimeoutMS" to 3 seconds. By default it's set to
# 30 seconds, which means it will block up to 30 seconds and fail if there are any SSL related
# or other errors
connection_timeout = cfg.CONF.database.connection_timeout
connection = mongoengine.connection.connect(
db_name,
host=db_host,
port=db_port,
tz_aware=True,
alias='foo',
username=username,
password=password,
connectTimeoutMS=connection_timeout,
serverSelectionTimeoutMS=connection_timeout,
**ssl_kwargs,
**compressor_kwargs,
)
# NOTE: Since pymongo 3.0, connect() method is lazy and not blocking (always returns success)
# so we need to issue a command / query to check if connection has been
# successfully established.
# See http://api.mongodb.com/python/current/api/pymongo/mongo_client.html for details
try:
# The ping command is cheap and does not require auth
# https://www.mongodb.com/community/forums/t/how-to-use-the-new-hello-interface-for-availability/116748/
connection.admin.command("ping")
except (ConnectionFailure, ServerSelectionTimeoutError) as e:
# NOTE: ServerSelectionTimeoutError can also be thrown if SSLHandShake fails in the server
# Sadly the client doesn't include more information about the error so in such scenarios
# user needs to check MongoDB server log
print(f'Failed to connect to database connected to database "{db_name}" @ "{db_host}:{db_port}" as user "{username}".')
raise e
print(f'Successfully connected to database "{db_name}" @ "{db_host}:{db_port}" as user "{username}".')
db = connection.st2 # choose st2 database
print("Inserting one using pymongo")
inserted = db.my_collection.insert_one({"x": 10}).inserted_id
print(f"inserted id of {inserted}")
# connection.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment