Created
November 14, 2017 15:20
-
-
Save cruftyoldsysadmin/b7330b1a9efa9f9ec536d56ba7334a27 to your computer and use it in GitHub Desktop.
First stab @ artifactory user management script.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/local/bin/python2 | |
import argparse | |
import json | |
import os | |
import pprint | |
import random | |
import re | |
import requests | |
import string | |
import sys | |
import boto3 | |
from botocore.exceptions import ClientError | |
# artifactory-user.py | |
# API wrapper to manage users on Artifactory | |
# | |
# * [Artifactory Security Configuration # API](https://www.jfrog.com/confluence/display/RTF/Security+Configuration+JSON) | |
# * [Artifactory REST API User management](https://www.jfrog.com/confluence/display/RTF/Artifactory+REST+API#ArtifactoryRESTAPI-CreateorReplaceUser) | |
# * [Artifactory Auth Tokens](Setup authtoken and encpass at https://cruftyold.jfrog.io/cruftyold/webapp/#/admin/security/users) | |
# add_user() | |
# | |
def add_user(opts={}): | |
request_url = opts['api_url'] + "/security/users/" + opts['name'] | |
data = json.dumps(opts) | |
user = requests.put(request_url, data, headers=opts['headers']) | |
return user | |
# del_user() delete user | |
# | |
def del_user(opts={}): | |
request_url = opts['api_url'] + '/security/users/' + opts['name'] | |
user = requests.delete(request_url, headers=opts['headers']) | |
return user | |
# err() log errors to stderr | |
def err(s): | |
if s: | |
sys.stderr.write('%s\n' % s) | |
# get_users() returns a list of users | |
# | |
def get_users(opts={}): | |
request_url = opts['api_url'] + "/security/users" | |
users = requests.get(request_url) | |
print users.text | |
return users | |
# get_status() check if a user exists | |
# | |
def get_status(opts={}): | |
request_url = opts['api_url'] + '/security/users/' + opts['name'] | |
req = requests.get(request_url, headers=opts['headers']) | |
if req.status_code > 299: | |
err('Error: User does not exist') | |
sys.exit(22) | |
else: | |
return req.text | |
# Generate a random password for the user | |
# | |
def gen_pass(): | |
return ''.join([random.choice(string.ascii_letters + string.digits) for n in xrange(20)]) | |
# merge_dicts(x,y) | |
# returns a dict created from two merged dicts | |
def merge_dicts(x, y): | |
z = {} | |
z.update(x) | |
z.update(y) | |
return z | |
# Simple Email Service config | |
def mail_config(): | |
config = { | |
'sender' : 'cruftyoldsysadmin@gmail.com', | |
'aws_region' : 'us-east-1', | |
'subject' : 'Artifactory account creation', | |
'charset' : 'UTF-8', | |
'recipient' : options['email'], | |
# Text body of the email. | |
'body_text' : ("Artifactory access\r\n" | |
"Please click on {0} for your Password to Artifactory\r\n" | |
"Login to Artifactory at http://cruftyold.jfrog.io").format(pwpush), | |
# HTML body of the email. | |
'body_html' : """<html> | |
<head></head> | |
<body> | |
<h1>Artifactory access</h1> | |
<p> Please click on {0} for your Password to Artifactory</p> | |
<p> Login to Artifactory at <a | |
href='http://cruftyold.jfrog.io'>http://cruftyold.jfrog.io/</a></p> | |
</body> | |
</html>""".format(pwpush), | |
'configset' : 'mthtest', | |
'profile' : options['profile'] | |
} | |
return config | |
# Parse commandline options | |
# | |
def parse_args(): | |
p = argparse.ArgumentParser(description='Mange users in Artifactory') | |
# Global parameters | |
p.add_argument('--api_key', required=True, type=str) | |
# Script actions | |
for arg in ['add', 'delete', 'list', 'status']: | |
p.add_argument("--{0}".format(arg), required=False,action='store_true') | |
# Splits defined and undefined arguments into separate hashes | |
[args1,extras1] = p.parse_known_args() | |
# Create a hash of arguments & values | |
actions = vars(args1) | |
actions['api_url'] = 'https://cruftyold.jfrog.io/cruftyold/api' | |
# Parses command arguments required for user additions | |
# | |
if actions['add']: | |
p1=argparse.ArgumentParser() | |
# Required arguments for user additions | |
for arg in ['email', 'groups', 'profile', 'name']: | |
p1.add_argument("--{0}".format(arg), required=True, type=str) | |
# Optional arguments for user additions | |
for arg in ['admin', 'profileUpdatable', 'disableUIaccess', 'internalPasswordDisabled']: | |
p1.add_argument("--{0}".format(arg), action='store_true') | |
# Splits defined and undefined arguments into separate hashes | |
[args2,extras2] = p1.parse_known_args() | |
options = merge_dicts(actions,vars(args2)) | |
options['headers'] = { 'X-JFrog-Art-Api' : options['api_key'], 'Content-Type' : 'application/json' } | |
return options | |
elif actions['list']: | |
options = actions | |
options['headers'] = { 'X-JFrog-Art-Api' : options['api_key'], 'Content-Type' : 'application/json' } | |
return options | |
else: | |
p1=argparse.ArgumentParser() | |
p1.add_argument('--name', required=True, type=str) | |
[args2,extras2] = p1.parse_known_args() | |
options = merge_dicts(actions, vars(args2)) | |
options['headers'] = { 'X-JFrog-Art-Api' : options['api_key'], 'Content-Type' : 'application/json' } | |
return options | |
# pw_push() Pushes password to pwpush.com | |
# | |
def pw_push(password): | |
pwp_token = requests.post('https://pwpush.com/p.json', data = | |
{ | |
'password[payload]' : password, | |
'password[expire_after_day]' : 5, | |
'password[expire_after_views]' : 5, | |
}).json()['url_token'] | |
pwp_url = "https://pwpush.com/p/" + pwp_token | |
return pwp_url | |
# Stolen from http://docs.aws.amazon.com/ses/latest/DeveloperGuide/send-using-sdk-python.html | |
def send_email(opts={}): | |
try: | |
session = boto3.Session(profile_name=opts['profile']) | |
mailclient = session.client('ses') | |
response = mailclient.send_email( | |
Destination = { | |
'ToAddresses': [ opts['recipient'] ], | |
}, | |
Message = { | |
'Body': { | |
'Html': { | |
'Charset' : opts['charset'], | |
'Data' : opts['body_html'], | |
}, | |
'Text': { | |
'Charset' : opts['charset'], | |
'Data' : opts['body_text'], | |
}, | |
}, | |
'Subject': { | |
'Charset' : opts['charset'], | |
'Data' : opts['subject'], | |
}, | |
}, | |
Source = opts['sender'], | |
) | |
except ClientError as e: | |
print(e.response['Error']['Message']) | |
else: | |
print("Email sent! Message ID:"), | |
print(response['ResponseMetadata']['RequestId']) | |
# notify_slack() | |
# Stolen from slacknotify in infra-tools | |
def notify_slack(opts={}): | |
headers = {'Content-type': 'application/json', 'Accept': 'text/plain'} | |
request = requests.post(opts['slack_webhook'], data=json.dumps(opts['data']), headers=headers) | |
if request.text != "ok": | |
sys.exit("Message failed to send: {0}".format(request.text)) | |
def user_config(opts={}): | |
config = {} | |
for opt in [ 'name', 'groups', 'email', 'admin', 'profileUpdatable', 'disableUIaccess', 'internalPasswordDisabled', 'api_key' ]: | |
# converts bools to strings | |
if type(opts[opt]) is bool: | |
config[opt] = str(opts[opt]).lower() | |
# converts groups csv to array | |
elif opt == 'groups': | |
config[opt] = opts[opt].split(',') | |
else: | |
config[opt] = opts[opt] | |
config['api_url'] = opts['api_url'] | |
config['password'] = gen_pass() | |
config['headers'] = { 'X-JFrog-Art-Api' : opts['api_key'], 'Content-Type' : 'application/json' } | |
return config | |
# valid_email() Tests that provided email is a valid email address | |
# | |
def valid_email(email): | |
if not isinstance(email, str): | |
return False | |
elif re.match('^[_a-z0-9-]+(\.[_a-z0-9-]+)*@[a-z0-9-]+(\.[a-z0-9-]+)*(\.[a-z]{2,4})$', email): | |
return True | |
# Creates an options hash from parse_args() | |
options=parse_args() | |
# Do all the things | |
# | |
if options['add']: | |
config = user_config(options) | |
user = add_user(config) | |
pwpush = pw_push(config['password']) | |
mailconfig = mail_config() | |
mailresp = send_email(mailconfig) | |
print pwpush | |
pprint.pprint(user) | |
elif options['delete']: | |
user = del_user(options) | |
pprint.pprint(user) | |
elif options['list']: | |
config = user_config(options) | |
users = get_users(config) | |
pprint.pprint(users) | |
elif options['status']: | |
status = get_status(options) | |
print status |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment