Skip to content

Instantly share code, notes, and snippets.

@ckuethe
Created May 19, 2017 22:44
Show Gist options
  • Save ckuethe/298e1e4a911c3f5042313a2a358b8897 to your computer and use it in GitHub Desktop.
Save ckuethe/298e1e4a911c3f5042313a2a358b8897 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# coding: utf-8
# Copyright (c) 2017 Chris Kuethe
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import pymongo as pm
import time
import uuid
import os
from getpass import getpass
import argparse
base_url = 'mongodb://{}:{}@localhost:27017/{}'
def dbcompact(conn, dbname=None):
'Compact the specified database, or all databases if unspecified'
if dbname:
dbs = [dbname]
else:
dbs = conn.database_names()
# don't mess with system databases
try:
dbs.remove('admin')
except ValueError:
pass
try:
dbs.remove('local')
except ValueError:
pass
if len(dbs) < 1:
return
for dbname in dbs:
for coll in conn[dbname].collection_names():
name = '{}/{}'.format(dbname,coll)
print '{:30s}'.format(name),
try:
t0 = time.time()
res = conn[dbname].command({'compact': coll})
print '{} {:.2f}s'.format(res['ok'] == 1.0, time.time() - t0)
except KeyboardInterrupt:
raise KeyboardInterrupt
except Exception, e:
print "Exception:", e
def make_user_private_database(mongo_client, dbname, userid=None, passwd=None, drop_existing=False):
'''
Creates a new database in the specified mongodb instance,
and adds a user with full privileges on that database. Also
create a table, and delete it as the new user to verify that
the user is properly permitted.
The username defaults to the name of the database.
The password default to a random string.
Returns a dict containing operation status
'''
global base_url
dblist = mongo_client.database_names()
if dbname in dblist:
if drop_existing:
status = mongo_client.drop_database(dbname)
else:
return {"Status": False, "Reason": "Database Already Exists"}
dbh = mongo_client[dbname]
# databases don't really exist until you write to them.
junktable = dbh['you_can_delete_this_collection']
status = junktable.insert_one({'foo': 1, 'bar': 2})
if userid is None:
userid = dbname
if passwd is None:
passwd = str(uuid.uuid4())
roles = [ {'role': 'userAdmin', 'db': dbname}, {'role': 'readWrite', 'db': dbname} ]
status = dbh.add_user( userid, passwd, roles = roles )
# this will throw an exception if the user is not properly permitted :)
testconn = pm.MongoClient(base_url.format(userid, passwd, dbname))
dbstats = testconn[dbname].command({'dbStats': 1})
testconn[dbname]['junk'].delete_many({})
testconn.close()
#print mongo_client[dbname].command({'dbStats': 1})
return {"Status": True, "dbname": dbname, "userid": userid, "passwd": passwd}
def destroy_user_private_database(mongo_client, dbname, userid=None):
'deletes a user private database, and its user'
userid = '{}.{}'.format(dbname,userid)
db_exists = dbname in mongo_client.database_names()
if userid is None:
userid = dbname
# XXX check to see if the target user is the db admin for that database?
user_exists = mongo_client.admin['system.users'].find({'_id': userid}).count()
res = mongo_client.admin['system.users'].delete_one({'_id': userid}).raw_result
res = mongo_client.drop_database(dbname)
return True
def fslurp(path):
with open(path) as fd:
return fd.read().strip()
def main():
global base_url
pwf = os.path.expanduser('~/.mongo_root_pw')
ap = argparse.ArgumentParser()
ap.add_argument('-d', '--dbname', dest='dbname', default=None, help='desired name of new database', required=True)
ap.add_argument('-u', '--userid', dest='userid', default=None, help='name of user who will own the database')
ap.add_argument('-p', '--passwd', dest='passwd', default=None, help='password for user who will own the database')
ap.add_argument('-f', '--force', dest='force', default=False, action='store_true', help='force creation or overwrite existing user/database')
ap.add_argument('-s', '--ssl', dest='ssl', default=False, action='store_true', help='use SSL')
ap.add_argument('-r', '--remove', dest='remove', default=False, action='store_true', help='remove the specified user/database')
ap.add_argument('-c', '--compact', dest='compact', default=False, action='store_true', help='compact the specified database, "*" for all')
ap.add_argument('-X', '--adminuser', dest='root_user', default='root', help='database administrator user')
ap.add_argument('-Y', '--adminpass', dest='root_pass', default=None, help='administrator password')
ap.add_argument('-Z', '--adminpassfile', dest='pwf', default=pwf, metavar='FILE', help='file containing administrator password')
args = ap.parse_args()
if args.ssl:
base_url = base_url + '?ssl=true&ssl_cert_reqs=CERT_NONE'
if args.root_pass is None:
if args.pwf and os.path.exists(args.pwf):
args.root_pass = fslurp(os.path.expanduser('~/.mongo_root_pw'))
else:
args.root_pass = getpass('MongoDB root password? ')
# this assumes that you have already turned on authentication
root_db = 'admin'
conn = pm.MongoClient(base_url.format(args.root_user, args.root_pass, root_db))
if args.compact:
if args.dbname == '*':
args.dbname = None
dbcompact(conn, args.dbname)
elif args.remove:
destroy_user_private_database(conn, args.dbname, args.userid)
else:
newdb = make_user_private_database(conn, args.dbname, args.userid, args.passwd, args.force)
if newdb['Status']:
print base_url.format(newdb['userid'], newdb['passwd'], newdb['dbname'])
else:
print newdb
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment