Skip to content

Instantly share code, notes, and snippets.

@nmcspadden
Created October 19, 2016 03:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nmcspadden/22cd99195361167284d68972ab48f80c to your computer and use it in GitHub Desktop.
Save nmcspadden/22cd99195361167284d68972ab48f80c to your computer and use it in GitHub Desktop.
adobe_tools (Complete)
#!/usr/bin/python
"""Adobe API tools."""
import sys
import time
import json
import os
try:
import jwt
import requests
except ImportError:
sys.exit(0)
if sys.version_info[0] == 2:
from ConfigParser import RawConfigParser
from urllib import urlencode
from urllib import quote
if sys.version_info[0] >= 3:
from configparser import RawConfigParser
from urllib.parse import urlencode
# INTERNAL / PRIVATE ACTIONS
def _product_list(config_data, access_token):
"""Get the list of product configurations."""
page = 0
result = {}
productlist = []
while result.get('lastPage', False) is not True:
url = "https://" + config_data['host'] + config_data['endpoint'] + \
"/groups/" + config_data['org_id'] + "/" + str(page)
res = requests.get(url, headers=headers(config_data, access_token))
if res.status_code == 200:
# print(res.status_code)
# print(res.headers)
# print(res.text)
result = json.loads(res.text)
productlist += result.get('groups', [])
page += 1
return productlist
def _user_list(config_data, access_token):
"""Get a list of all users."""
page = 0
result = {}
userlist = []
while result.get('lastPage', False) is not True:
url = "https://" + config_data['host'] + config_data['endpoint'] + \
"/users/" + config_data['org_id'] + "/" + str(page)
res = requests.get(url, headers=headers(config_data, access_token))
if res.status_code == 200:
# print(res.status_code)
# print(res.headers)
# print(res.text)
result = json.loads(res.text)
userlist += result.get('users', [])
page += 1
return userlist
def _users_of_product(config_data, product_config_name, access_token):
"""Get a list of users of a specific configuration."""
page = 0
result = {}
userlist = []
while result.get('lastPage', False) is not True:
url = "https://" + config_data['host'] + config_data['endpoint'] + \
"/users/" + config_data['org_id'] + "/" + str(page) + "/" + \
quote(product_config_name)
res = requests.get(url, headers=headers(config_data, access_token))
if res.status_code == 200:
# print(res.status_code)
# print(res.headers)
# print(res.text)
result = json.loads(res.text)
userlist += result.get('users', [])
page += 1
return userlist
def _add_product_to_user(config_data, products, user, access_token):
"""Add product config to user."""
add_dict = {
'user': user,
'do': [
{
'add': {
'product': products
}
}
]
}
body = json.dumps([add_dict])
url = "https://" + config_data['host'] + config_data['endpoint'] + \
"/action/" + config_data['org_id']
res = requests.post(
url,
headers=headers(config_data, access_token),
data=body
)
if res.status_code != 200:
print(res.status_code)
print(res.headers)
print(res.text)
else:
results = json.loads(res.text)
if results.get('notCompleted') == 1:
print("Not completed!")
print(results.get('errors'))
return False
if results.get('completed') == 1:
print("Completed!")
return True
def _user_data(config_data, access_token, username):
"""Get the data for a given user."""
userlist = _user_list(config_data, access_token)
for user in userlist:
if user['email'] == username:
return user
return {}
def _products_per_user(config_data, access_token, username):
"""Return a list of products assigned to user."""
user_info = _user_data(config_data, access_token, username)
return user_info.get('groups', [])
def _remove_product_from_user(config_data, products, user, access_token):
"""Remove products from user."""
add_dict = {
'user': user,
'do': [
{
'remove': {
'product': products
}
}
]
}
body = json.dumps([add_dict])
url = "https://" + config_data['host'] + config_data['endpoint'] + \
"/action/" + config_data['org_id']
res = requests.post(
url,
headers=headers(config_data, access_token),
data=body
)
if res.status_code != 200:
print(res.status_code)
print(res.headers)
print(res.text)
else:
results = json.loads(res.text)
if results.get('notCompleted') == 1:
print("Not completed!")
print(results.get('errors'))
return False
if results.get('completed') == 1:
print("Completed!")
return True
def _add_federated_user(
config_data, access_token, email, country, firstname, lastname
):
"""Add user to domain."""
add_dict = {
'user': email,
'do': [
{
'createFederatedID': {
'email': email,
'country': country,
'firstname': firstname,
'lastname': lastname,
}
}
]
}
body = json.dumps([add_dict])
url = "https://" + config_data['host'] + config_data['endpoint'] + \
"/action/" + config_data['org_id']
res = requests.post(
url,
headers=headers(config_data, access_token),
data=body
)
if res.status_code != 200:
print(res.status_code)
print(res.headers)
print(res.text)
else:
results = json.loads(res.text)
if results.get('notCompleted') == 1:
print("Not completed!")
print(results.get('errors'))
return False
if results.get('completed') == 1:
print("Completed!")
return True
def _remove_user_from_org(config_data, access_token, user):
"""Remove user from organization."""
add_dict = {
'user': user,
'do': [
{
'removeFromOrg': {}
}
]
}
body = json.dumps([add_dict])
url = "https://" + config_data['host'] + config_data['endpoint'] + \
"/action/" + config_data['org_id']
res = requests.post(
url,
headers=headers(config_data, access_token),
data=body
)
if res.status_code != 200:
print(res.status_code)
print(res.headers)
print(res.text)
else:
results = json.loads(res.text)
if results.get('notCompleted') == 1:
print("Not completed!")
print(results.get('errors'))
return False
if results.get('completed') == 1:
print("Completed!")
return True
# CONFIG
def get_private_key(priv_key_filename):
"""Retrieve private key from file."""
priv_key_file = open(priv_key_filename)
priv_key = priv_key_file.read()
priv_key_file.close()
return priv_key
def get_user_config(filename=None):
"""Retrieve config data from file."""
# read configuration file
config = RawConfigParser()
config.read(filename)
config_dict = {
# server parameters
'host': config.get("server", "host"),
'endpoint': config.get("server", "endpoint"),
'ims_host': config.get("server", "ims_host"),
'ims_endpoint_jwt': config.get("server", "ims_endpoint_jwt"),
# enterprise parameters used to construct JWT
'domain': config.get("enterprise", "domain"),
'org_id': config.get("enterprise", "org_id"),
'api_key': config.get("enterprise", "api_key"),
'client_secret': config.get("enterprise", "client_secret"),
'tech_acct': config.get("enterprise", "tech_acct"),
'priv_key_filename': config.get("enterprise", "priv_key_filename"),
}
return config_dict
def prepare_jwt_token(config_data, priv_key):
"""Construct the JSON Web Token for auth."""
# set expiry time for JSON Web Token
expiry_time = int(time.time()) + 60 * 60 * 24
# create payload
payload = {
"exp": expiry_time,
"iss": config_data['org_id'],
"sub": config_data['tech_acct'],
"aud": "https://" + config_data['ims_host'] + "/c/" +
config_data['api_key'],
"https://" + config_data['ims_host'] + "/s/" + "ent_user_sdk": True
}
# create JSON Web Token
jwt_token = jwt.encode(payload, priv_key, algorithm='RS256')
# decode bytes into string
jwt_token = jwt_token.decode("utf-8")
return jwt_token
def prepare_access_token(config_data, jwt_token):
"""Generate the access token."""
# Method parameters
url = "https://" + config_data['ims_host'] + config_data['ims_endpoint_jwt']
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Cache-Control": "no-cache"
}
body_credentials = {
"client_id": config_data['api_key'],
"client_secret": config_data['client_secret'],
"jwt_token": jwt_token
}
body = urlencode(body_credentials)
# send http request
res = requests.post(url, headers=headers, data=body)
# evaluate response
if res.status_code == 200:
# extract token
access_token = json.loads(res.text)["access_token"]
return access_token
else:
# print response
print(res.status_code)
print(res.headers)
print(res.text)
return None
def generate_config(userconfig=None, private_key_filename=None):
"""Return tuple of necessary config data."""
# Get userconfig data
if userconfig:
user_config_path = userconfig
else:
# user_config_path = raw_input('Path to config file: ')
user_config_path = '/Library/CPE/lib/sysconfig/usermanagement.config'
if not os.path.isfile(str(user_config_path)):
print('Management config not found!')
sys.exit(1)
# Get private key
if private_key_filename:
priv_key_path = private_key_filename
else:
# priv_key_path = raw_input('Path to private key: ')
priv_key_path = '/Library/CPE/lib/sysconfig/private.key'
if not os.path.isfile(str(priv_key_path)):
print('Private key not found!')
sys.exit(1)
priv_key = get_private_key(priv_key_path)
# Get config data
config_data = get_user_config(user_config_path)
# Get the JWT
jwt_token = prepare_jwt_token(config_data, priv_key)
# Get the access token
access_token = prepare_access_token(config_data, jwt_token)
if not access_token:
print("Access token failed!")
sys.exit(1)
return (config_data, jwt_token, access_token)
def headers(config_data, access_token):
"""Return the headers needed."""
headers = {
"Content-type": "application/json",
"Accept": "application/json",
"x-api-key": config_data['api_key'],
"Authorization": "Bearer " + access_token
}
return headers
# PUBLIC FUNCTIONS
# Each of these will generate the config data it needs to work
def get_product_list():
"""Get list of products."""
(config_data, jwt_token, access_token) = generate_config()
productlist = _product_list(config_data, access_token)
products = []
for product in productlist:
products.append(product['groupName'])
return products
def get_user_list():
"""Get list of user emails."""
(config_data, jwt_token, access_token) = generate_config()
userlist = _user_list(config_data, access_token)
names = []
for user in userlist:
names.append(user['email'])
return names
def add_products(desired_products, target_user):
"""Add products to specific user."""
(config_data, jwt_token, access_token) = generate_config()
productlist = _product_list(config_data, access_token)
userlist = _user_list(config_data, access_token)
names = []
for user in userlist:
names.append(user['email'])
products = []
for product in productlist:
products.append(product['groupName'])
if target_user not in names:
print("Didn't find %s in userlist" % target_user)
return False
for product in desired_products:
if product not in products:
print("Didn't find %s in product list" % product)
return False
result = _add_product_to_user(
config_data,
desired_products,
target_user,
access_token,
)
return result
def does_user_have_product(target_user, product):
"""Return True/False if a user has the specified product."""
(config_data, jwt_token, access_token) = generate_config()
membership = _products_per_user(config_data, access_token, target_user)
return product in membership
def remove_products(removed_products, target_user):
"""Remove products from specific user."""
(config_data, jwt_token, access_token) = generate_config()
productlist = _product_list(config_data, access_token)
userlist = _user_list(config_data, access_token)
names = []
for user in userlist:
names.append(user['email'])
products = []
for product in productlist:
products.append(product['groupName'])
if target_user not in names:
print("Didn't find %s in userlist" % target_user)
return False
for product in removed_products:
if product not in products:
print("Didn't find %s in product list" % product)
return False
result = _remove_product_from_user(
config_data,
removed_products,
target_user,
access_token,
)
return result
def add_user(email, firstname, lastname, country='US'):
"""Add federated user account."""
(config_data, jwt_token, access_token) = generate_config()
result = _add_federated_user(
config_data,
access_token,
email,
country,
firstname,
lastname,
)
return result
def remove_user(email):
"""Remove user account."""
(config_data, jwt_token, access_token) = generate_config()
result = _remove_user_from_org(
config_data,
access_token,
email,
)
return result
def user_exists(user):
"""Does the user exist already as a federated ID?"""
(config_data, jwt_token, access_token) = generate_config()
result = _user_data(
config_data,
access_token,
user,
)
if result.get('type') == 'federatedID':
return True
return False
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment