Skip to content

Instantly share code, notes, and snippets.

@cruftyoldsysadmin
Created November 14, 2017 15:20
Show Gist options
  • Save cruftyoldsysadmin/b7330b1a9efa9f9ec536d56ba7334a27 to your computer and use it in GitHub Desktop.
Save cruftyoldsysadmin/b7330b1a9efa9f9ec536d56ba7334a27 to your computer and use it in GitHub Desktop.
First stab @ artifactory user management script.
#!/usr/local/bin/python2
import argparse
import json
import os
import pprint
import random
import re
import requests
import string
import sys
import boto3
from botocore.exceptions import ClientError
# artifactory-user.py
# API wrapper to manage users on Artifactory
#
# * [Artifactory Security Configuration # API](https://www.jfrog.com/confluence/display/RTF/Security+Configuration+JSON)
# * [Artifactory REST API User management](https://www.jfrog.com/confluence/display/RTF/Artifactory+REST+API#ArtifactoryRESTAPI-CreateorReplaceUser)
# * [Artifactory Auth Tokens](Setup authtoken and encpass at https://cruftyold.jfrog.io/cruftyold/webapp/#/admin/security/users)
# add_user()
#
def add_user(opts={}):
request_url = opts['api_url'] + "/security/users/" + opts['name']
data = json.dumps(opts)
user = requests.put(request_url, data, headers=opts['headers'])
return user
# del_user() delete user
#
def del_user(opts={}):
request_url = opts['api_url'] + '/security/users/' + opts['name']
user = requests.delete(request_url, headers=opts['headers'])
return user
# err() log errors to stderr
def err(s):
if s:
sys.stderr.write('%s\n' % s)
# get_users() returns a list of users
#
def get_users(opts={}):
request_url = opts['api_url'] + "/security/users"
users = requests.get(request_url)
print users.text
return users
# get_status() check if a user exists
#
def get_status(opts={}):
request_url = opts['api_url'] + '/security/users/' + opts['name']
req = requests.get(request_url, headers=opts['headers'])
if req.status_code > 299:
err('Error: User does not exist')
sys.exit(22)
else:
return req.text
# Generate a random password for the user
#
def gen_pass():
return ''.join([random.choice(string.ascii_letters + string.digits) for n in xrange(20)])
# merge_dicts(x,y)
# returns a dict created from two merged dicts
def merge_dicts(x, y):
z = {}
z.update(x)
z.update(y)
return z
# Simple Email Service config
def mail_config():
config = {
'sender' : 'cruftyoldsysadmin@gmail.com',
'aws_region' : 'us-east-1',
'subject' : 'Artifactory account creation',
'charset' : 'UTF-8',
'recipient' : options['email'],
# Text body of the email.
'body_text' : ("Artifactory access\r\n"
"Please click on {0} for your Password to Artifactory\r\n"
"Login to Artifactory at http://cruftyold.jfrog.io").format(pwpush),
# HTML body of the email.
'body_html' : """<html>
<head></head>
<body>
<h1>Artifactory access</h1>
<p> Please click on {0} for your Password to Artifactory</p>
<p> Login to Artifactory at <a
href='http://cruftyold.jfrog.io'>http://cruftyold.jfrog.io/</a></p>
</body>
</html>""".format(pwpush),
'configset' : 'mthtest',
'profile' : options['profile']
}
return config
# Parse commandline options
#
def parse_args():
p = argparse.ArgumentParser(description='Mange users in Artifactory')
# Global parameters
p.add_argument('--api_key', required=True, type=str)
# Script actions
for arg in ['add', 'delete', 'list', 'status']:
p.add_argument("--{0}".format(arg), required=False,action='store_true')
# Splits defined and undefined arguments into separate hashes
[args1,extras1] = p.parse_known_args()
# Create a hash of arguments & values
actions = vars(args1)
actions['api_url'] = 'https://cruftyold.jfrog.io/cruftyold/api'
# Parses command arguments required for user additions
#
if actions['add']:
p1=argparse.ArgumentParser()
# Required arguments for user additions
for arg in ['email', 'groups', 'profile', 'name']:
p1.add_argument("--{0}".format(arg), required=True, type=str)
# Optional arguments for user additions
for arg in ['admin', 'profileUpdatable', 'disableUIaccess', 'internalPasswordDisabled']:
p1.add_argument("--{0}".format(arg), action='store_true')
# Splits defined and undefined arguments into separate hashes
[args2,extras2] = p1.parse_known_args()
options = merge_dicts(actions,vars(args2))
options['headers'] = { 'X-JFrog-Art-Api' : options['api_key'], 'Content-Type' : 'application/json' }
return options
elif actions['list']:
options = actions
options['headers'] = { 'X-JFrog-Art-Api' : options['api_key'], 'Content-Type' : 'application/json' }
return options
else:
p1=argparse.ArgumentParser()
p1.add_argument('--name', required=True, type=str)
[args2,extras2] = p1.parse_known_args()
options = merge_dicts(actions, vars(args2))
options['headers'] = { 'X-JFrog-Art-Api' : options['api_key'], 'Content-Type' : 'application/json' }
return options
# pw_push() Pushes password to pwpush.com
#
def pw_push(password):
pwp_token = requests.post('https://pwpush.com/p.json', data =
{
'password[payload]' : password,
'password[expire_after_day]' : 5,
'password[expire_after_views]' : 5,
}).json()['url_token']
pwp_url = "https://pwpush.com/p/" + pwp_token
return pwp_url
# Stolen from http://docs.aws.amazon.com/ses/latest/DeveloperGuide/send-using-sdk-python.html
def send_email(opts={}):
try:
session = boto3.Session(profile_name=opts['profile'])
mailclient = session.client('ses')
response = mailclient.send_email(
Destination = {
'ToAddresses': [ opts['recipient'] ],
},
Message = {
'Body': {
'Html': {
'Charset' : opts['charset'],
'Data' : opts['body_html'],
},
'Text': {
'Charset' : opts['charset'],
'Data' : opts['body_text'],
},
},
'Subject': {
'Charset' : opts['charset'],
'Data' : opts['subject'],
},
},
Source = opts['sender'],
)
except ClientError as e:
print(e.response['Error']['Message'])
else:
print("Email sent! Message ID:"),
print(response['ResponseMetadata']['RequestId'])
# notify_slack()
# Stolen from slacknotify in infra-tools
def notify_slack(opts={}):
headers = {'Content-type': 'application/json', 'Accept': 'text/plain'}
request = requests.post(opts['slack_webhook'], data=json.dumps(opts['data']), headers=headers)
if request.text != "ok":
sys.exit("Message failed to send: {0}".format(request.text))
def user_config(opts={}):
config = {}
for opt in [ 'name', 'groups', 'email', 'admin', 'profileUpdatable', 'disableUIaccess', 'internalPasswordDisabled', 'api_key' ]:
# converts bools to strings
if type(opts[opt]) is bool:
config[opt] = str(opts[opt]).lower()
# converts groups csv to array
elif opt == 'groups':
config[opt] = opts[opt].split(',')
else:
config[opt] = opts[opt]
config['api_url'] = opts['api_url']
config['password'] = gen_pass()
config['headers'] = { 'X-JFrog-Art-Api' : opts['api_key'], 'Content-Type' : 'application/json' }
return config
# valid_email() Tests that provided email is a valid email address
#
def valid_email(email):
if not isinstance(email, str):
return False
elif re.match('^[_a-z0-9-]+(\.[_a-z0-9-]+)*@[a-z0-9-]+(\.[a-z0-9-]+)*(\.[a-z]{2,4})$', email):
return True
# Creates an options hash from parse_args()
options=parse_args()
# Do all the things
#
if options['add']:
config = user_config(options)
user = add_user(config)
pwpush = pw_push(config['password'])
mailconfig = mail_config()
mailresp = send_email(mailconfig)
print pwpush
pprint.pprint(user)
elif options['delete']:
user = del_user(options)
pprint.pprint(user)
elif options['list']:
config = user_config(options)
users = get_users(config)
pprint.pprint(users)
elif options['status']:
status = get_status(options)
print status
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment