Skip to content

Instantly share code, notes, and snippets.

@zackn9ne
Created July 1, 2020 19:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zackn9ne/75ad9c4e36b901070428bcc892fe4edb to your computer and use it in GitHub Desktop.
Save zackn9ne/75ad9c4e36b901070428bcc892fe4edb to your computer and use it in GitHub Desktop.
jssi.py
#!/usr/bin/python
# Copyright 2014-2017 Shea G. Craig
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
#
# See the License for the specific language governing permissions and
# limitations under the License.
"""See docstring for JSSImporter class."""
from __future__ import absolute_import
import importlib
import os
import sys
import time
from collections import OrderedDict
from distutils.version import StrictVersion
from zipfile import ZipFile, ZIP_DEFLATED
from xml.sax.saxutils import escape
# ElementTree monkey patch borrowed with love from Matteo Ferla.
# https://blog.matteoferla.com/2019/02/uniprot-xml-and-python-elementtree.html
sys.modules.pop('xml.etree.ElementTree', None)
sys.modules['_elementtree'] = None
ElementTree = importlib.import_module('xml.etree.ElementTree')
sys.path.insert(0, '/Library/AutoPkg/JSSImporter')
import jss
# Ensure that python-jss dependency is at minimum version
try:
from jss import __version__ as PYTHON_JSS_VERSION
except ImportError:
PYTHON_JSS_VERSION = "0.0.0"
from autopkglib import Processor, ProcessorError
__all__ = ["JSSImporter"]
__version__ = "1.1.0"
REQUIRED_PYTHON_JSS_VERSION = StrictVersion("2.1.0")
# Map Python 2 basestring type for Python 3.
if sys.version_info.major == 3:
basestring = str
# pylint: disable=too-many-instance-attributes, too-many-public-methods
class JSSImporter(Processor):
"""Uploads packages to configured Casper distribution points.
Optionally, creates supporting categories, computer groups, policy,
self service icon, extension attributes, and scripts.
File paths to support files are searched for in order:
1. Path as specified.
2. The parent folder of the path.
3. First ParentRecipe's folder.
4. First ParentRecipe's parent folder.
5. Second ParentRecipe's folder.
6. Second ParentRecipe's parent folder.
7. Nth ParentRecipe's folder.
8. Nth ParentRecipe's parent folder.
This search-path method is primarily in place to support using
recipe overrides. It applies to policy_template, computer group
templates, self_service_icon, script templates, and extension
attribute templates. It allows users to avoid having to copy the
file to the override directory for each recipe.
"""
input_variables = {
"prod_name": {
"required": True,
"description": "Name of the product.",
},
"jss_inventory_name": {
"required": False,
"description":
"Smart groups using the 'Application Title' " "criteria need "
"to specify the app's filename, as registered in the JSS's "
"inventory. If this variable is left out, it will generate an "
"'Application Title' by adding '.app' to the prod_name, e.g. "
"prod_name='Google Chrome', calculated "
"jss_inventory_name='Google Chrome.app'. If you need to "
"override this behavior, specify the correct name with this "
"variable.",
},
"pkg_path": {
"required": False,
"description":
"Path to a pkg or dmg to import - provided by "
"previous pkg recipe/processor.",
"default": "",
},
"version": {
"required": False,
"description":
"Version number of software to import - usually provided "
"by previous pkg recipe/processor, but if not, defaults to "
"'0.0.0.0'. ",
"default": "0.0.0.0"
},
"JSS_REPOS": {
"required": False,
"description":
"Array of dicts for each intended distribution point. Each "
"distribution point type requires slightly different "
"configuration keys and data. Please consult the "
"documentation. ",
"default": [],
},
"JSS_URL": {
"required": True,
"description":
"URL to a Jamf Pro server that the API user has write access "
"to, optionally set as a key in the com.github.autopkg "
"preference file.",
},
"API_USERNAME": {
"required": True,
"description":
"Username of account with appropriate access to "
"jss, optionally set as a key in the com.github.autopkg "
"preference file.",
},
"API_PASSWORD": {
"required": True,
"description":
"Password of api user, optionally set as a key in "
"the com.github.autopkg preference file.",
},
"JSS_VERIFY_SSL": {
"required": False,
"description":
"If set to False, SSL verification in communication "
"with the Jamf Pro server will be skipped. Defaults to 'True'.",
"default": True,
},
"JSS_SUPPRESS_WARNINGS": {
"required": False,
"description":
"Determines whether to suppress urllib3 warnings. "
"If you choose not to verify SSL with JSS_VERIFY_SSL, urllib3 "
"throws warnings for each of the numerous requests "
"JSSImporter makes. If you would like to see them, set to "
"'False'. Defaults to 'True'.",
"default": True,
},
"category": {
"required": False,
"description":
"Category to create/associate imported app "
"package with. Defaults to 'No category assigned'.",
},
"policy_category": {
"required": False,
"description":
"Category to create/associate policy with. Defaults"
" to 'No category assigned'.",
},
"force_policy_state": {
"required": False,
"description":
"If set to False JSSImporter will not override the policy "
"enabled state. This allows creating new policies in a default "
"state and then going and manually enabling them in the Jamf Pro server. "
"Boolean, defaults to 'True'",
"default": True,
},
"os_requirements": {
"required": False,
"description":
"Comma-seperated list of OS version numbers to "
"allow. Corresponds to the OS Requirements field for "
"packages. The character 'x' may be used as a wildcard, as "
"in '10.9.x'",
"default": ""
},
"package_info": {
"required": False,
"description": "Text to apply to the package's Info field.",
"default": ""
},
"package_notes": {
"required": False,
"description": "Text to apply to the package's Notes field.",
"default": ""
},
"package_priority": {
"required": False,
"description":
"Priority to use for deploying or uninstalling the "
"package. Value between 1-20. Defaults to '10'",
"default": "10"
},
"package_reboot": {
"required": False,
"description":
"Computers must be restarted after installing the package "
"Boolean. Defaults to 'False'",
"default": "False"
},
"package_boot_volume_required": {
"required": False,
"description":
"Ensure that the package is installed on the boot drive "
"after imaging. Boolean. Defaults to 'True'",
"default": "True"
},
"groups": {
"required": False,
"description":
"Array of group dictionaries. Wrap each group in a "
"dictionary. Group keys include 'name' (Name of the group to "
"use, required), 'smart' (Boolean: static group=False, smart "
"group=True, default is False, not required), "
"template_path' (string: path to template file to use for "
"group, required for smart groups, invalid for static groups), "
"and 'do_update' (Boolean: default is True, not required)",
},
"exclusion_groups": {
"required": False,
"description":
"Array of group dictionaries. Wrap each group in a "
"dictionary. Group keys include 'name' (Name of the group to "
"use, required), 'smart' (Boolean: static group=False, smart "
"group=True, default is False, not required), and "
"template_path' (string: path to template file to use for "
"group, required for smart groups, invalid for static groups), "
"and 'do_update' (Boolean: default is True, not required)",
},
"scripts": {
"required": False,
"description":
"Array of script dictionaries. Wrap each script in "
"a dictionary. Script keys include 'name' (Name of the script "
"to use, required), 'template_path' (string: path to template "
"file to use for script, required)",
},
"extension_attributes": {
"required": False,
"description":
"Array of extension attribute dictionaries. Wrap each "
"extension attribute in a dictionary. Script keys include: "
"'ext_attribute_path' (string: path to extension attribute "
"file.)",
},
"policy_template": {
"required": False,
"description":
"Filename of policy template file. If key is "
"missing or value is blank, policy creation will be skipped.",
"default": "",
},
"policy_action_type": {
"required": False,
"description":
"Type of policy 'package_configuration' to perform. Must be "
"one of 'Install', 'Cache', 'Install Cached'.",
"default": "Install",
},
"self_service_description": {
"required": False,
"description":
"Use to populate the %SELF_SERVICE_DESCRIPTION% variable for "
"use in templates. Primary use is for filling the info button "
"text in Self Service, but could be used elsewhere.",
"default": "",
},
"self_service_icon": {
"required": False,
"description":
"Path to an icon file. Use to add an icon to a "
"self-service enabled policy. Because of the way Casper "
"handles this, the JSSImporter will only upload if the icon's "
"filename is different than the one set on the policy (if it "
"even exists). Please see the README for more information.",
"default": "",
},
"site_id": {
"required": False,
"description": "ID of the target Site",
},
"site_name": {
"required": False,
"description": "Name of the target Site",
},
"STOP_IF_NO_JSS_UPLOAD": {
"required": False,
"default": True,
"description":
("If True, the processor will stop after verifying that "
"a PKG upload was not required since a PKG of the same name "
"is already present on the server"),
},
"skip_scope": {
"required": False,
"default": False,
"description":
("If True, policy scope will not be updated. By default group "
"and policy updates are coupled together. This allows group "
"updates without updating or overwriting policy scope."),
},
"skip_scripts": {
"required": False,
"default": False,
"description": "If True, policy scripts will not be updated.",
},
}
output_variables = {
"jss_changed_objects": {
"description": "Dictionary of added or changed values."
},
"jss_importer_summary_result": {
"description": "Description of interesting results."
},
}
description = __doc__
def __init__(self, env=None, infile=None, outfile=None):
"""Sets attributes here."""
super(JSSImporter, self).__init__(env, infile, outfile)
self.jss = None
self.pkg_name = None
self.prod_name = None
self.version = None
self.category = None
self.policy_category = None
self.package = None
self.replace_dict = {}
self.extattrs = None
self.groups = None
self.exclusion_groups = None
self.scripts = None
self.policy = None
self.upload_needed = False
def create_jss(self):
"""Create a JSS object for API calls"""
kwargs = {
'url': self.env['JSS_URL'],
'user': self.env['API_USERNAME'],
'password': self.env['API_PASSWORD'],
'ssl_verify': self.env["JSS_VERIFY_SSL"],
'repo_prefs': self.env["JSS_REPOS"]}
self.jss = jss.JSS(**kwargs)
if self.env.get('verbose', 1) >= 4:
self.jss.verbose = True
def init_jss_changed_objects(self):
"""Build a dictionary to track changes to JSS objects."""
keys = (
"jss_repo_updated", "jss_category_added", "jss_package_added",
"jss_package_updated", "jss_group_added", "jss_group_updated",
"jss_script_added", "jss_script_updated",
"jss_extension_attribute_added", "jss_extension_attribute_updated",
"jss_policy_added", "jss_policy_updated", "jss_icon_uploaded")
self.env["jss_changed_objects"] = {key: [] for key in keys}
def repo_type(self):
"""returns the type of repo"""
if self.env["JSS_REPOS"]:
try:
repo = self.env["JSS_REPOS"][0]["type"]
except KeyError:
repo = "DP"
else:
return
return repo
def wait_for_id(self, obj_cls, obj_name):
"""wait for feedback that the object is there"""
object = None
search_method = getattr(self.jss, obj_cls.__name__)
# limit time to wait to get a package ID.
timeout = time.time() + 120
while time.time() < timeout:
try:
object = search_method(obj_name)
if object.id != 0:
self.output("{} ID '{}' verified on server".format(obj_cls.__name__, object.id))
self.upload_needed = True
return object
else:
self.output("Waiting to get {} ID from server (reported: {})...".format(obj_cls.__name__,
object.id))
time.sleep(10)
except jss.GetError:
self.output("Waiting to get {} ID from server (none reported)...".format(obj_cls.__name__))
time.sleep(10)
def handle_category(self, category_type, category_name=None):
"""Ensure a category is present."""
if self.env.get(category_type):
category_name = self.env.get(category_type)
if category_name is not None:
try:
category = self.jss.Category(category_name)
category_name = category.name
self.output(
"Category, type '{}', name '{}', already exists on the Jamf Pro server, "
"moving on...".format(category_type, category_name))
except jss.GetError:
# Category doesn't exist
category = jss.Category(self.jss, category_name)
category.save()
self.wait_for_id(jss.Category, category)
try:
category.id
self.output(
"Category, type '{}', name '{}', created.".format(category_type,
category_name))
self.env["jss_changed_objects"]["jss_category_added"].append(
category_name)
except ValueError:
raise ProcessorError("Failed to get category ID from {}.".format(self.repo_type()))
else:
category = None
return category
def handle_package(self):
"""Creates or updates, and copies a package object.
This will only upload a package if a file with the same name
does not already exist on a DP. If you need to force a
re-upload, you must delete the package on the DP first.
Further, if you are using a JDS, it will only upload a package
if a package object with a filename matching the AutoPkg
filename does not exist. If you need to force a re-upload to a
JDS, please delete the package object through the web interface
first.
"""
# Skip package handling if there is no package or repos.
pkg_path = self.env["pkg_path"]
if self.repo_type() is None or pkg_path == "":
self.output("Package upload and object update skipped. If this is "
"a mistake, ensure you have JSS_REPOS configured.")
return
# Ensure that `pkg_path` is valid.
if not os.path.exists(pkg_path):
raise ProcessorError(
"JSSImporter can't find a package at '{}'!".format(pkg_path))
# See if the package is non-flat (requires zipping prior to
# upload).
if os.path.isdir(pkg_path):
self.output("Package object is a bundle. Converting to zip...")
pkg_path = self.zip_pkg_path(pkg_path)
self.env["pkg_path"] = pkg_path
# Make sure our change gets added back into the env for
# visibility.
self.pkg_name += ".zip"
# now check if the package object already exists
try:
package = self.jss.Package(self.pkg_name)
self.output("Package object already exists on the Jamf Pro server. "
"(ID: {})".format(package.id))
pkg_update = (self.env["jss_changed_objects"]["jss_package_updated"])
# for cloud DPs we must assume that the package object means there is an associated package
except jss.GetError:
# Package doesn't exist
self.output("Package object does not already exist on the Jamf Pro server.")
# for CDP or JDS types, the package has to be uploaded first to generate a package object
# then we wait for the package ID, then we can continue to assign attributes to the package
# object.
if self.repo_type() == "JDS" or self.repo_type() == "CDP" or self.repo_type() == "AWS":
self.copy(pkg_path)
package = self.wait_for_id(jss.Package, self.pkg_name)
try:
package.id
pkg_update = (self.env["jss_changed_objects"]["jss_package_added"])
except ValueError:
raise ProcessorError("Failed to get Package ID from {}.".format(self.repo_type()))
elif self.repo_type() == "DP" or self.repo_type() == "SMB" or self.repo_type() == "AFP" or self.repo_type() == "Local":
# for AFP/SMB shares, we create the package object first and then copy the package
# if it is not already there
self.output("Creating Package object...")
package = jss.Package(self.jss, self.pkg_name)
pkg_update = (self.env["jss_changed_objects"]["jss_package_added"])
else:
# repo type that is not supported
raise ProcessorError(
"JSSImporter can't upload the Package at '{}'! Repo type {} is not supported. Please reconfigure your JSSImporter prefs.".format(pkg_path, self.repo_type()))
# For local DPs we check that the package is already on the distribution point and upload it if not
if self.repo_type() == "DP" or self.repo_type() == "SMB" or self.repo_type() == "AFP" or self.repo_type() == "Local":
if self.jss.distribution_points.exists(os.path.basename(pkg_path)):
self.output("Package upload not required.")
self.upload_needed = False
else:
self.copy(pkg_path)
self.output("Package {} uploaded to distribution point.".format(self.pkg_name))
self.upload_needed = True
# only update the package object if an uploand ad was carried out
if not self.upload_needed:
print(type(self.env["STOP_IF_NO_JSS_UPLOAD"]))
if (self.env["STOP_IF_NO_JSS_UPLOAD"]) == "True":
self.output("Not overwriting policy, you may consider leaving this flag off entirely: upload_needed {} "
"and STOP_IF_NO_JSS_UPLOAD is set to {}.".format(self.upload_needed, self.env["STOP_IF_NO_JSS_UPLOAD"]))
self.env["stop_processing_recipe"] = True
return
elif (self.env["STOP_IF_NO_JSS_UPLOAD"]) == True:
self.output("Not overwriting policy, default behavior detected: upload_needed {} "
"and STOP_IF_NO_JSS_UPLOAD is set to {}.".format(self.upload_needed, self.env["STOP_IF_NO_JSS_UPLOAD"]))
self.env["stop_processing_recipe"] = True
return
elif (self.env["STOP_IF_NO_JSS_UPLOAD"]) == "False":
self.output("Proceeding to overwrite policy and: {} "
"because STOP_IF_NO_JSS_UPLOAD is set to {}.".format(self.upload_needed,self.env["STOP_IF_NO_JSS_UPLOAD"]))
else:
self.output("Invalid STOP_IF_NO_JSS_UPLOAD value given so not overwriting "
"STOP_IF_NO_JSS_UPLOAD is effectivly being set to True: upload_needed {}. STOP_IF_NO_JSS_UPLOAD of {} is not allowed effectively acting as False".format(self.upload_needed, self.env["STOP_IF_NO_JSS_UPLOAD"]))
self.env["stop_processing_recipe"] = True
return
return
# now update the package object
os_requirements = self.env.get("os_requirements")
package_info = self.env.get("package_info")
package_notes = self.env.get("package_notes")
package_priority = self.env.get("package_priority")
package_reboot = self.env.get("package_reboot")
package_boot_volume_required = self.env.get(
"package_boot_volume_required")
if self.category is not None:
cat_name = self.category.name
else:
cat_name = ""
if self.repo_type() == "JDS" or self.repo_type() == "CDP" or self.repo_type() == "AWS":
self.wait_for_id(jss.Package, self.pkg_name)
try:
package.id
pkg_update = (self.env["jss_changed_objects"]["jss_package_added"])
except ValueError:
raise ProcessorError("Failed to get Package ID from {}.".format(self.repo_type()))
self.update_object(cat_name, package, "category", pkg_update)
self.update_object(os_requirements, package, "os_requirements",
pkg_update)
self.update_object(package_info, package, "info", pkg_update)
self.update_object(package_notes, package, "notes", pkg_update)
self.update_object(package_priority, package, "priority",
pkg_update)
self.update_object(package_reboot, package, "reboot_required",
pkg_update)
self.update_object(package_boot_volume_required, package,
"boot_volume_required", pkg_update)
return package
def zip_pkg_path(self, path):
"""Add files from path to a zip file handle.
Args:
path (str): Path to folder to zip.
Returns:
(str) name of resulting zip file.
"""
zip_name = "{}.zip".format(path)
with ZipFile(
zip_name, "w", ZIP_DEFLATED, allowZip64=True) as zip_handle:
for root, _, files in os.walk(path):
for member in files:
zip_handle.write(os.path.join(root, member))
self.output("Closing: {}".format(zip_name))
return zip_name
def handle_extension_attributes(self):
"""Add extension attributes if needed."""
extattrs = self.env.get("extension_attributes")
results = []
if extattrs:
for extattr in extattrs:
extattr_object = self.update_or_create_new(
jss.ComputerExtensionAttribute,
extattr["ext_attribute_path"],
update_env="jss_extension_attribute_added",
added_env="jss_extension_attribute_updated")
results.append(extattr_object)
return results
def handle_groups(self, groups):
"""Manage group existence and creation."""
computer_groups = []
if groups:
for group in groups:
# self.output("Computer Group to process: {}".format(group["name"]))
if self.validate_input_var(group):
is_smart = group.get("smart", False)
if is_smart:
computer_group = self.add_or_update_smart_group(group)
else:
computer_group = (
self.add_or_update_static_group(group))
computer_groups.append(computer_group)
return computer_groups
def handle_scripts(self):
"""Add scripts if needed."""
scripts = self.env.get("scripts")
results = []
if scripts:
for script in scripts:
self.output('Looking for Script file {}...'.format(script["name"]))
script_file = self.find_file_in_search_path(
script["name"])
try:
with open(script_file) as script_handle:
script_contents = script_handle.read()
except IOError:
raise ProcessorError(
"Script '{}' could not be read!".format(script_file))
script_object = self.update_or_create_new(
jss.Script,
script["template_path"],
os.path.basename(script_file),
added_env="jss_script_added",
update_env="jss_script_updated",
script_contents=script_contents)
results.append(script_object)
return results
def handle_policy(self):
"""Create or update a policy."""
if self.env.get("policy_template"):
template_filename = self.env.get("policy_template")
policy = self.update_or_create_new(
jss.Policy, template_filename, update_env="jss_policy_updated",
added_env="jss_policy_added")
# self.output("PolicyPackage object: {}".format(policy.id))
else:
self.output("Policy creation not desired, moving on...")
policy = None
return policy
def handle_icon(self):
"""Add self service icon if needed."""
# Icons are tricky. The only way to add new ones is to use
# FileUploads. If you manually upload them, you can add them to
# a policy to get their ID, but there is no way to query the Jamf Pro server
# to see what icons are available. Thus, icon handling involves
# several cooperating methods. If we just add an icon every
# time we run a recipe, however, we end up with a ton of
# redundent icons, and short of actually deleting them in the
# sql database, there's no way to delete icons. So when we run,
# we first check for an existing policy, and if it exists, copy
# its icon XML, which is then added to the templated Policy. If
# there is no icon information, but the recipe specifies one,
# then FileUpload it up.
# If no policy handling is desired, we can't upload an icon.
if self.env.get("self_service_icon") and self.policy is not None:
# Search through search-paths for icon file.
self.output('Looking for Icon file {}...'.format(self.env["self_service_icon"]))
icon_path = self.find_file_in_search_path(
self.env["self_service_icon"])
icon_filename = os.path.basename(icon_path)
# Compare the filename in the policy to the one provided by
# the recipe. If they don't match, we need to upload a new
# icon.
policy_filename = self.policy.findtext(
"self_service/self_service_icon/filename")
if not policy_filename == icon_filename:
self.output("Icon name in existing policy: {}".format(policy_filename))
icon = jss.FileUpload(self.jss, "policies", "id",
self.policy.id, icon_path)
icon.save()
self.env["jss_changed_objects"]["jss_icon_uploaded"].append(
icon_filename)
self.output("Icon uploaded to the Jamf Pro server.")
else:
self.output("Icon matches existing icon, moving on...")
def update_object(self, data, obj, path, update):
"""Update an object if it differs.
If a value differs between the recipe and the object, update
the object to reflect the change, and add the object to a
summary list.
Args:
data: Recipe string value to enforce.
obj: JSSObject type to set data on.
path: String path to desired XML.
update: Summary list object to append obj to if something
is changed.
"""
if data != obj.findtext(path):
obj.find(path).text = data
obj.save()
self.output("{} '{}' updated.".format(
str(obj.__class__).split(".")[-1][:-2], path))
update.append(obj.name)
def copy(self, source_item, id_=-1):
"""Copy a package or script using the JSS_REPOS preference."""
self.output("Copying {} to all distribution points.".format(source_item))
def output_copy_status(connection):
"""Output AutoPkg copying status."""
self.output("Copying to {}".format(connection["url"]))
self.jss.distribution_points.copy(source_item, id_=id_,
pre_callback=output_copy_status)
self.env["jss_changed_objects"]["jss_repo_updated"].append(
os.path.basename(source_item))
self.output("Copied '{}'".format(source_item))
def build_replace_dict(self):
"""Build dict of replacement values based on available input."""
# First, add in AutoPkg's env, excluding types that don't make
# sense:
replace_dict = {key: val for key, val in self.env.items()
if val is not None and isinstance(val, basestring)}
# Next, add in "official" and Legacy input variables.
replace_dict["VERSION"] = self.version
if self.package is not None:
replace_dict["PKG_NAME"] = self.package.name
replace_dict["PROD_NAME"] = self.env.get("prod_name")
if self.env.get("site_id"):
replace_dict["SITE_ID"] = self.env.get("site_id")
if self.env.get("site_name"):
replace_dict["SITE_NAME"] = self.env.get("site_name")
replace_dict["SELF_SERVICE_DESCRIPTION"] = self.env.get(
"self_service_description")
replace_dict["SELF_SERVICE_ICON"] = self.env.get(
"self_service_icon")
# policy_category is not required, so set a default value if
# absent.
replace_dict["POLICY_CATEGORY"] = self.env.get(
"policy_category") or "Unknown"
# Some applications may have a product name that differs from
# the name that the JSS uses for its "Application Title"
# inventory field. If so, you can set it with the
# jss_inventory_name input variable. If this variable is not
# specified, it will just append .app, which is how most apps
# work.
if self.env.get("jss_inventory_name"):
replace_dict["JSS_INVENTORY_NAME"] = self.env.get(
"jss_inventory_name")
else:
replace_dict["JSS_INVENTORY_NAME"] = ("%s.app" %
self.env.get("prod_name"))
self.replace_dict = replace_dict
# pylint: disable=too-many-arguments
def update_or_create_new(self, obj_cls, template_path, name="",
added_env="", update_env="", script_contents=""):
"""Check for an existing object and update it, or create a new
object.
Args:
obj_cls: The python-jss object class to work with.
template_path: String filename or path to the template
file. See get_templated_object() for more info.
name: The name to use. Defaults to the "name" property of
the templated object.
added_env: The environment var to update if an object is
added.
update_env: The environment var to update if an object is
updated.
script_contents (str): XML escaped script.
do_update (Boolean): Do not overwrite an existing group if
set to False.
Returns:
The recipe object after updating.
"""
# Create a new object from the template
recipe_object = self.get_templated_object(obj_cls, template_path)
# Ensure categories exist prior to using them in an object.
# Text replacement has already happened, so categories should
# be in place.
try:
category_name = recipe_object.category.text
except AttributeError:
category_name = None
if category_name is not None:
self.handle_category(obj_cls.root_tag, category_name)
if not name:
name = recipe_object.name
# Check for an existing object with this name.
existing_object = None
search_method = getattr(self.jss, obj_cls.__name__)
try:
existing_object = search_method(name)
except jss.GetError:
pass
# If object is a Policy, we need to inject scope, scripts,
# package, and an icon.
if obj_cls is jss.Policy:
# If a policy_category has been given as an input variable,
# it wins. Replace whatever is in the template, and add in
# a category tag if it isn't there.
if self.env.get('policy_category'):
policy_category = recipe_object.find('category')
if policy_category is None:
policy_category = ElementTree.SubElement(
recipe_object, "category")
policy_category.text = self.env.get('policy_category')
if existing_object is not None:
# If this policy already exists, and it has an icon set,
# copy its icon section to our template, as we have no
# other way of getting this information.
icon_xml = existing_object.find(
"self_service/self_service_icon")
if icon_xml is not None:
self.add_icon_to_policy(recipe_object, icon_xml)
if not self.env.get('force_policy_state'):
state = existing_object.find('general/enabled').text
recipe_object.find('general/enabled').text = state
# If skip_scope is True then don't include scope data.
if self.env["skip_scope"] is not True:
self.add_scope_to_policy(recipe_object)
else:
self.output("Skipping assignment of scope as skip_scope is set to {}".format(self.env["skip_scope"]))
# If skip_scripts is True then don't include scripts data.
if self.env["skip_scripts"] is not True:
self.add_scripts_to_policy(recipe_object)
else:
self.output("Skipping assignment of scripts as skip_scripts is set to {}".format(self.env["skip_scripts"]))
# add package to policy if there is one
self.add_package_to_policy(recipe_object)
# If object is a script, add the passed contents to the object.
# throw it into the `script_contents` tag of the object.
if obj_cls is jss.Script and script_contents:
tag = ElementTree.SubElement(recipe_object, "script_contents")
tag.text = script_contents
if existing_object is not None:
# Update the existing object.
# Copy the ID from the existing object to the new one so
# that it knows how to save itself.
recipe_object._basic_identity["id"] = existing_object.id
recipe_object.save()
# get feedback that the object has been created
object = self.wait_for_id(obj_cls, name)
try:
object.id
# Retrieve the updated XML.
recipe_object = search_method(name)
self.output("{} '{}' updated.".format(obj_cls.__name__, name))
if update_env:
self.env["jss_changed_objects"][update_env].append(name)
except ValueError:
raise ProcessorError("Failed to get {} ID from {}.".format(obj_cls.__name__, self.repo_type()))
else:
# Object doesn't exist yet.
recipe_object.save()
# get feedback that the object has been created
object = self.wait_for_id(obj_cls, name)
try:
object.id
self.output("{} '{}' created.".format(obj_cls.__name__, name))
if added_env:
self.env["jss_changed_objects"][added_env].append(name)
except ValueError:
raise ProcessorError("Failed to get {} ID from {}.".format(obj_cls.__name__, self.repo_type()))
return recipe_object
# pylint: enable=too-many-arguments
def get_templated_object(self, obj_cls, template_path):
"""Return an object based on a template located in search path.
Args:
obj_cls: JSSObject class (for the purposes of JSSImporter a
Policy or a ComputerGroup)
template_path: String filename or path to template file.
See find_file_in_search_path() for more information on
file searching.
Returns:
A JSS Object created based on the template,
post-text-replacement.
"""
self.output('Looking for {} template file {}...'.format(obj_cls.__name__, os.path.basename(template_path)))
final_template_path = self.find_file_in_search_path(template_path)
# Open and return a new object.
with open(final_template_path, "r") as template_file:
text = template_file.read()
template = self.replace_text(text, self.replace_dict)
return obj_cls.from_string(self.jss, template)
def find_file_in_search_path(self, path):
"""Search search_paths for the first existing instance of path.
Searches, in order, through the following directories
until a matching file is found:
1. Path as specified.
2. The parent folder of the path.
3. First ParentRecipe's folder.
4. First ParentRecipe's parent folder.
5. Second ParentRecipe's folder.
6. Second ParentRecipe's parent folder.
7. Nth ParentRecipe's folder.
8. Nth ParentRecipe's parent folder.
This search-path method is primarily in place to
support using recipe overrides. It allows users to avoid having
to copy templates, icons, etc, to the override directory.
Args:
path: String filename or path to file.
If path is just a filename, path is assumed to
be self.env["RECIPE_DIR"].
Returns:
Absolute path to the first match in search paths.
Raises:
ProcessorError if none of the above files exist.
"""
# Ensure input is expanded.
path = os.path.expanduser(path)
# Check to see if path is a filename.
if not os.path.dirname(path):
# If so, assume that the file is meant to be in the recipe
# directory.
path = os.path.join(self.env["RECIPE_DIR"], path)
filename = os.path.basename(path)
parent_recipe_dirs = [os.path.dirname(parent) for parent in
self.env["PARENT_RECIPES"]]
unique_parent_dirs = OrderedDict()
for parent in parent_recipe_dirs:
unique_parent_dirs[parent] = parent
search_dirs = ([os.path.dirname(path)] + list(unique_parent_dirs))
tested = []
final_path = ""
# Look for the first file that exists in the search_dirs and
# their parent folders.
for search_dir in search_dirs:
test_path = os.path.join(search_dir, filename)
test_parent_folder_path = os.path.abspath(
os.path.join(search_dir, "..", filename))
if os.path.exists(test_path):
final_path = test_path
elif os.path.exists(test_parent_folder_path):
final_path = test_parent_folder_path
tested.append(test_path)
tested.append(test_parent_folder_path)
if final_path:
self.output("Found file: {}".format(final_path))
break
if not final_path:
raise ProcessorError(
"Unable to find file {} at any of the following locations: {}"
.format(filename, tested))
return final_path
def replace_text(self, text, replace_dict): # pylint: disable=no-self-use
"""Substitute items in a text string. Also escapes for XML,
as this is the only use for this definition
Args:
text: A string with embedded %tags%.
replace_dict: A dict, where
key: Corresponds to the % delimited tag in text.
value: Text to swap in.
Returns:
The text after replacement.
"""
for key in replace_dict:
# Wrap our keys in % to match template tags.
value = escape(replace_dict[key])
text = text.replace("%%%s%%" % key, value)
return text
def validate_input_var(self, var): # pylint: disable=no-self-use
"""Validate the value before trying to add a group.
Args:
var: Dictionary to check for problems.
Returns: False if dictionary has invalid values, or True if it
seems okay.
"""
# Skipping non-string values:
# Does group name or template have a replacement var
# that has not been replaced?
# Does the group have a blank value? (A blank value isn't really
# invalid, but there's no need to process it further.)
invalid = [False for value in var.values() if isinstance(value, str)
and (value.startswith("%") and value.endswith("%"))]
if not var.get('name') or not var.get('template_path'):
invalid = True
return False if invalid else True
def add_or_update_smart_group(self, group):
"""Either add a new group or update existing group."""
# Build the template group object
self.replace_dict["group_name"] = group["name"]
if group.get("site_id"):
self.replace_dict["site_id"] = group.get("site_id")
if group.get("site_name"):
self.replace_dict["site_name"] = group.get("site_name")
# If do_update is set to False, do not update this object
do_update = group.get("do_update", True)
if not do_update:
try:
computer_group = self.jss.ComputerGroup(group["name"])
self.output("ComputerGroup '%s' already exists "
"and set not to update." %
computer_group.name)
return computer_group
except jss.GetError:
self.output("ComputerGroup '%s' does not already exist. "
"Creating from template." %
group["name"])
computer_group = self.update_or_create_new(
jss.ComputerGroup, group["template_path"],
update_env="jss_group_updated", added_env="jss_group_added")
return computer_group
def add_or_update_static_group(self, group):
"""Either add a new group or update existing group."""
# Check for pre-existing group first
try:
computer_group = self.jss.ComputerGroup(group["name"])
self.output("Computer Group: {} already exists."
.format(computer_group.name))
except jss.GetError:
computer_group = jss.ComputerGroup(self.jss, group["name"])
computer_group.save()
self.output("Computer Group '{}' created.".format(computer_group.name))
self.env["jss_changed_objects"]["jss_group_added"].append(
computer_group.name)
return computer_group
def add_scope_to_policy(self, policy_template):
"""Incorporate scoping groups into a policy."""
computer_groups_element = self.ensure_xml_structure(
policy_template, "scope/computer_groups")
exclusion_groups_element = self.ensure_xml_structure(
policy_template, "scope/exclusions/computer_groups")
for group in self.groups:
policy_template.add_object_to_path(group, computer_groups_element)
for group in self.exclusion_groups:
policy_template.add_object_to_path(group, exclusion_groups_element)
def add_scripts_to_policy(self, policy_template):
"""Incorporate scripts into a policy."""
scripts_element = self.ensure_xml_structure(policy_template, "scripts")
for script in self.scripts:
script_element = policy_template.add_object_to_path(
script, scripts_element)
priority = ElementTree.SubElement(script_element, "priority")
priority.text = script.findtext("priority")
def add_package_to_policy(self, policy_template):
"""Add a package to a self service policy."""
if self.package is not None:
self.ensure_xml_structure(policy_template,
"package_configuration/packages")
action_type = self.env['policy_action_type']
self.output("Setting policy to '%s' package." % action_type)
policy_template.add_package(self.package, action_type=action_type)
def add_icon_to_policy(self, policy_template, icon_xml):
"""Add an icon to a self service policy."""
self.ensure_xml_structure(policy_template, "self_service")
self_service = policy_template.find("self_service")
self_service.append(icon_xml)
def ensure_xml_structure(self, element, path):
"""Ensure that all tiers of an XML hierarchy exist."""
search, _, path = path.partition("/")
if search:
if element.find(search) is None:
ElementTree.SubElement(element, search)
return self.ensure_xml_structure(element.find(search), path)
return element
def get_report_string(self, items): # pylint: disable=no-self-use
"""Return human-readable string from a list of Jamf Pro API objects."""
return ", ".join(set(items))
def summarize(self):
"""If anything has been added or updated, report back."""
# Only summarize if something has happened.
if any(value for value in self.env["jss_changed_objects"].values()):
# Create a blank summary.
self.env["jss_importer_summary_result"] = {
"summary_text": "The following changes were made to the Jamf Pro Server:",
"report_fields": [
"Name", "Package", "Categories", "Groups", "Scripts",
"Extension_Attributes", "Policy", "Icon", "Version",
"Package_Uploaded"],
"data": {
"Name": "",
"Package": "",
"Categories": "",
"Groups": "",
"Scripts": "",
"Extension_Attributes": "",
"Policy": "",
"Icon": "",
"Version": "",
"Package_Uploaded": ""
}
}
# TODO: This is silly. Use a defaultdict for storing changes
# and just copy the stuff that changed.
# Shortcut variables for lovely code conciseness
changes = self.env["jss_changed_objects"]
data = self.env["jss_importer_summary_result"]["data"]
data["Name"] = self.env.get('NAME', '')
data["Version"] = self.env.get('version', '')
package = self.get_report_string(changes["jss_package_added"] +
changes["jss_package_updated"])
if package:
data["Package"] = package
policy = changes["jss_policy_updated"] + (
changes["jss_policy_added"])
if policy:
data["Policy"] = self.get_report_string(policy)
if changes["jss_icon_uploaded"]:
data["Icon"] = os.path.basename(self.env["self_service_icon"])
# Get nice strings for our list-types.
if changes["jss_category_added"]:
data["Categories"] = self.get_report_string(
changes["jss_category_added"])
groups = changes["jss_group_updated"] + changes["jss_group_added"]
if groups:
data["Groups"] = self.get_report_string(groups)
scripts = changes["jss_script_updated"] + (
changes["jss_script_added"])
if scripts:
data["Scripts"] = self.get_report_string(scripts)
extattrs = changes["jss_extension_attribute_updated"] + (
changes["jss_extension_attribute_added"])
if extattrs:
data["Extension_Attributes"] = self.get_report_string(extattrs)
jss_package_uploaded = self.get_report_string(changes["jss_repo_updated"])
if jss_package_uploaded:
data["Package_Uploaded"] = "True"
def main(self):
"""Main processor code."""
# Ensure we have the right version of python-jss
python_jss_version = StrictVersion(PYTHON_JSS_VERSION)
self.output("python-jss version: {}.".format(python_jss_version))
if python_jss_version < REQUIRED_PYTHON_JSS_VERSION:
self.output(
"python-jss version is too old. Please update to version: {}."
.format(REQUIRED_PYTHON_JSS_VERSION))
raise ProcessorError
self.output("JSSImporter version: {}.".format(__version__))
# clear any pre-existing summary result
if "jss_importer_summary_result" in self.env:
del self.env["jss_importer_summary_result"]
self.create_jss()
self.output("Jamf Pro version: '{}'".format(self.jss.version()))
self.pkg_name = os.path.basename(self.env["pkg_path"])
self.prod_name = self.env["prod_name"]
self.version = self.env.get("version")
if self.version == "0.0.0.0":
self.output(
"Warning: No `version` was added to the AutoPkg env up to "
"this point. JSSImporter is defaulting to version {}!"
.format(self.version))
# Build and init jss_changed_objects
self.init_jss_changed_objects()
self.category = self.handle_category("category")
self.policy_category = self.handle_category("policy_category")
# Get our DPs ready for copying.
if len(self.jss.distribution_points) == 0:
self.output("Warning: No distribution points configured!")
for dp in self.jss.distribution_points:
self.output("Checking if DP already mounted...")
dp.was_mounted = hasattr(dp, 'is_mounted') and dp.is_mounted()
# Don't bother mounting the DPs if there's no package.
if self.env["pkg_path"]:
self.jss.distribution_points.mount()
self.package = self.handle_package()
# stop if no package was uploaded and STOP_IF_NO_JSS_UPLOAD is True
if (self.env["STOP_IF_NO_JSS_UPLOAD"] is True
and not self.upload_needed):
# Done with DPs, unmount them.
for dp in self.jss.distribution_points:
if not dp.was_mounted:
self.output("Unmounting DP...")
self.jss.distribution_points.umount()
self.summarize()
return
# Build our text replacement dictionary
self.build_replace_dict()
self.extattrs = self.handle_extension_attributes()
self.groups = self.handle_groups(self.env.get("groups"))
self.exclusion_groups = self.handle_groups(
self.env.get("exclusion_groups"))
self.scripts = self.handle_scripts()
self.policy = self.handle_policy()
self.handle_icon()
# Done with DPs, unmount them.
for dp in self.jss.distribution_points:
if not dp.was_mounted:
self.jss.distribution_points.umount()
self.summarize()
# pylint: enable=too-many-instance-attributes, too-many-public-methods
if __name__ == "__main__":
processor = JSSImporter() # pylint: disable=invalid-name
processor.execute_shell()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment