Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@sideangleside
Created February 22, 2018 23:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sideangleside/d1ce75e0a776073868d9e31793cd6920 to your computer and use it in GitHub Desktop.
Save sideangleside/d1ce75e0a776073868d9e31793cd6920 to your computer and use it in GitHub Desktop.
from __future__ import print_function, division, absolute_import
#
# Copyright (c) 2010 - 2012 Red Hat, Inc.
#
# This software is licensed to you under the GNU General Public License,
# version 2 (GPLv2). There is NO WARRANTY for this software, express or
# implied, including the implied warranties of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2
# along with this software; if not, see
# http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt.
#
# Red Hat trademarks are not licensed under GPLv2. No permission is
# granted to use or replicate Red Hat trademarks that are incorporated
# in this software or its documentation.
#
import errno
import os
import sys
from six.moves import cStringIO
from zipfile import ZipFile, BadZipfile
from rhsm import certificate
from rct.commands import RCTCliCommand
from rct.printing import xstr
from subscription_manager.cli import InvalidCLIOptionError
from rhsm import ourjson as json
from subscription_manager.i18n import ugettext as _
def get_value(json_dict, path):
current = json_dict
for item in path.split("."):
if item in current:
current = current[item]
else:
return ""
return current
def colorize(msg):
#color = False
#if color:
# return "\033[94m%s\033[0m" % msg
#else:
return msg
class ZipExtractAll(ZipFile):
"""extend ZipFile with a safer extractall
Zipfile() does not support extractall on python2.4, and the 2.6 versions
are known to be unsafe in how they extract files. 2.6 version does not
validate that files are within the archive root, or check that files are
created safely.
Contains helper methods for manipulating and reading
the zipfile more easily in memory"""
inner_zip = None
def __init__(self, *args, **kwargs):
"""
Validates the zip file
"""
try:
ZipFile.__init__(self, *args, **kwargs)
except BadZipfile:
print(_("Manifest zip is invalid."))
sys.exit(1)
def _get_inner_zip(self):
if self.inner_zip is None:
output = cStringIO(self.read(RCTManifestCommand.INNER_FILE))
self.inner_zip = ZipExtractAll(output, 'r')
return self.inner_zip
def _read_file(self, file_path, is_inner=False):
try:
output = cStringIO(self.read(file_path))
result = output.getvalue()
output.close()
except KeyError:
try:
if is_inner:
raise KeyError
result = self._get_inner_zip()._read_file(file_path, True)
except KeyError:
raise Exception(_('Unable to find file "%s" in manifest.') % file_path)
return result
def _get_entitlements(self):
results = []
in_zip = self._get_inner_zip()
for filename in in_zip.namelist():
(read_path, read_file) = os.path.split(filename)
if (read_path == os.path.join("export", "entitlements")) and (len(read_file) > 0):
results.append(filename)
return results
def _open_excl(self, path):
return os.fdopen(os.open(path, os.O_RDWR | os.O_CREAT | os.O_EXCL), 'w')
def _write_file(self, output_path, archive_path):
outfile = self._open_excl(output_path)
outfile.write(self.read(archive_path))
outfile.close()
def _is_secure(self, base, new_file):
base_path = os.path.abspath(base)
new_path = os.path.abspath(new_file)
if not new_path.startswith(base_path):
raise Exception(_('Manifest zip attempted to extract outside of the base directory.'))
#traces symlink to source, and checks that it is valid
real_new_path = os.path.realpath(new_path)
if real_new_path != new_path:
self._is_secure(base, real_new_path)
elif os.path.islink(new_path):
raise Exception(_('Unable to trace symbolic link. Possibly circular linkage.'))
def extractall(self, location, overwrite=False):
self._is_secure(location, location)
for path_name in self.namelist():
(directory, filename) = os.path.split(path_name)
directory = os.path.join(location, directory)
self._is_secure(location, directory)
if not os.path.exists(directory):
os.makedirs(directory)
new_location = os.path.join(directory, filename)
self._is_secure(location, new_location)
if (os.path.exists(new_location) and overwrite):
os.remove(new_location)
self._write_file(new_location, path_name)
class RCTManifestCommand(RCTCliCommand):
INNER_FILE = "consumer_export.zip"
def __init__(self, name="cli", aliases=None, shortdesc=None, primary=False):
RCTCliCommand.__init__(self, name=name, aliases=aliases,
shortdesc=shortdesc, primary=primary)
def _get_usage(self):
return _("%%prog %s [OPTIONS] MANIFEST_FILE") % self.name
def _validate_options(self):
manifest_file = self._get_file_from_args()
if not manifest_file:
raise InvalidCLIOptionError(_("You must specify a manifest file."))
if not os.path.isfile(manifest_file):
raise InvalidCLIOptionError(_("The specified manifest file does not exist."))
def _extract_manifest(self, location, overwrite=False):
# Extract the outer file
archive = ZipExtractAll(self._get_file_from_args(), 'r')
archive.extractall(location, overwrite)
# now extract the inner file
if location:
inner_file = os.path.join(location, self.INNER_FILE)
else:
inner_file = self.INNER_FILE
archive = ZipExtractAll(inner_file, 'r')
archive.extractall(location, overwrite)
# Delete the intermediate file
os.remove(inner_file)
class CatManifestCommand(RCTManifestCommand):
def __init__(self):
RCTManifestCommand.__init__(self, name="cat-manifest", aliases=['cm'],
shortdesc=_("Print manifest information"),
primary=True)
self.parser.add_option("--no-content", action="store_true",
default=False,
help=_("skip printing Content Sets"))
def _print_section(self, title, items, indent=1, whitespace=True):
# Allow a bit of customization of the tabbing
pad = "\t" * (indent - 1)
print(pad + title)
pad += "\t"
for item in items:
if len(item) == 2:
print("%s%s: %s" % (pad, item[0], xstr(item[1])))
else:
print("%s%s" % (pad, item[0]))
if whitespace:
print("")
def _print_general(self, zip_archive):
# Print out general data
part = zip_archive._read_file(os.path.join("export", "meta.json"))
data = json.loads(part)
to_print = []
to_print.append((_("Server"), get_value(data, "webAppPrefix")))
to_print.append((_("Server Version"), get_value(data, "version")))
to_print.append((_("Date Created"), get_value(data, "created")))
to_print.append((_("Creator"), get_value(data, "principalName")))
self._print_section(_("General:"), to_print)
def _print_consumer(self, zip_archive):
# Print out the consumer data
part = zip_archive._read_file(os.path.join("export", "consumer.json"))
data = json.loads(part)
to_print = []
to_print.append((_("Name"), get_value(data, "name")))
to_print.append((_("UUID"), get_value(data, "uuid")))
# contentAccessMode is entitlement if null, blank or non-present
contentAccessMode = 'entitlement'
if "contentAccessMode" in data and data["contentAccessMode"] == 'org_environment':
contentAccessMode = 'org_environment'
to_print.append((_("Content Access Mode"), contentAccessMode))
to_print.append((_("Type"), get_value(data, "type.label")))
self._print_section(_("Consumer:"), to_print)
def _get_product_attribute(self, name, data):
return_value = None
for attr in get_value(data, "pool.productAttributes"):
if attr["name"] == name:
return_value = attr["value"]
break
return return_value
def _print_products(self, zip_archive):
entitlements = zip_archive._get_entitlements()
if len(entitlements) == 0:
self._print_section(_("Subscriptions:"), [["None"]], 1, True)
return
for ent_file in entitlements:
part = zip_archive._read_file(ent_file)
data = json.loads(part)
to_print = []
to_print.append((_("Name"), get_value(data, "pool.productName")))
to_print.append((_("Quantity"), get_value(data, "quantity")))
to_print.append((_("Created"), get_value(data, "created")))
to_print.append((_("Start Date"), get_value(data, "startDate")))
to_print.append((_("End Date"), get_value(data, "endDate")))
to_print.append((_("Service Level"), self._get_product_attribute("support_level", data)))
to_print.append((_("Service Type"), self._get_product_attribute("support_type", data)))
to_print.append((_("Architectures"), self._get_product_attribute("arch", data)))
to_print.append((_("SKU"), get_value(data, "pool.productId")))
to_print.append((_("Contract"), get_value(data, "pool.contractNumber")))
to_print.append((_("Order"), get_value(data, "pool.orderNumber")))
to_print.append((_("Account"), get_value(data, "pool.accountNumber")))
virt_limit = self._get_product_attribute("virt_limit", data)
to_print.append((_("Virt Limit"), virt_limit))
facts_to_print = []
sub_fact_data = ""
require_virt_who = False
if virt_limit:
require_virt_who = True
facts_to_print.append((_("VIRT_LIMIT"), "This subscription supports up to " + colorize(virt_limit) + " guests"))
else:
facts_to_print.append((_("VIRT_LIMIT"), "This subscription does not require virt-who"))
to_print.append((_("Requires Virt-who"), require_virt_who))
### Sockets
sockets = self._get_product_attribute("sockets", data)
if sockets:
to_print.append((_("Sockets"), sockets))
facts_to_print.append(
(_("SOCKETS"), "This subscription supports systems with up to " + colorize(sockets) + " sockets"))
else:
to_print.append((_("Sockets"), "Not Applicable"))
sockets = "0"
stacked = get_value(data, "pool.stacked")
to_print.append((_("Stackable"), stacked))
if stacked:
facts_to_print.append((_("STACKABLE"), "This subscription is " + colorize(
"stackable") + " to support hosts with > " + colorize(sockets) + " sockets"))
else:
facts_to_print.append((_("STACKABLE"), "This subscription is not " + colorize("stackable") + " "))
### Cores
cores = self._get_product_attribute("cores", data)
if cores:
to_print.append((_("Cores"), cores))
facts_to_print.append(
(_("CORES"), "This subscription supports systems with up to " + colorize(cores) + " cores"))
else:
to_print.append((_("Cores"), "Not Applicable"))
### Inst Multiplier
instance_multiplier = self._get_product_attribute("instance_multiplier", data)
if instance_multiplier:
to_print.append((_("Instance Multipler"), instance_multiplier))
else:
to_print.append((_("Instance Multipler"), "Not Applicable"))
### Cloud Access
cloud_access_enabled = self._get_product_attribute("cloud_access_enabled", data)
if cloud_access_enabled:
to_print.append((_("Cloud Access"), cloud_access_enabled.capitalize()))
facts_to_print.append((_("CLOUD ACCESS"), "This subscription is eligible for the " + colorize(
"Cloud Access Program") + ""))
else:
to_print.append((_("Cloud Access"), "Not Applicable"))
entitlement_file = os.path.join("export", "entitlements", "%s.json" % data["id"])
to_print.append((_("Entitlement File"), entitlement_file))
#Get the certificate to get the version
serial = data["certificates"][0]["serial"]["id"]
cert_file = os.path.join("export", "entitlement_certificates", "%s.pem" % serial)
to_print.append((_("Certificate File"), cert_file))
try:
cert = certificate.create_from_pem(zip_archive._read_file(cert_file))
except certificate.CertificateException as ce:
raise certificate.CertificateException(
_("Unable to read certificate file '%s': %s") % (cert_file,
ce))
to_print.append((_("Certificate Version"), cert.version))
self._print_section(_("Subscription:"), to_print, 1, False)
# Get the provided Products
to_print = [(int(pp["productId"]), pp["productName"]) for pp in data["pool"]["providedProducts"]]
self._print_section(_("Provided Products:"), sorted(to_print), 2, False)
# Get the derived provided Products (if available)
if "derivedProvidedProducts" in data["pool"]:
to_print = [(int(pp["productId"]), pp["productName"]) for pp in data["pool"]["derivedProvidedProducts"]]
self._print_section(_("Derived Products:"), sorted(to_print), 2, False)
self._print_section(_("Facts:"), facts_to_print, 2, False)
# Get the Content Sets
if not self.options.no_content:
for item in cert.content:
to_print.append((_("Name"), item.name))
to_print.append((_("\tLabel"), item.label))
to_print.append((_("\tURL"), item.url))
self._print_section(_("Content Sets & Labels:"), to_print, 2, True)
else: # bz#1369577: print a blank line to separate subscriptions when --no-content in use
print("")
def _do_command(self):
"""
Does the work that this command intends.
"""
temp = ZipExtractAll(self._get_file_from_args(), 'r')
# Print out the header
print("\n+-------------------------------------------+")
print(_("\tManifest"))
print("+-------------------------------------------+\n")
self._print_general(temp)
self._print_consumer(temp)
self._print_products(temp)
class DumpManifestCommand(RCTManifestCommand):
def __init__(self):
RCTManifestCommand.__init__(self, name="dump-manifest", aliases=['dm'],
shortdesc=_("Dump the contents of a manifest"),
primary=True)
self.parser.add_option("--destination", dest="destination",
help=_("directory to extract the manifest to"))
self.parser.add_option("-f", "--force", action="store_true",
dest="overwrite_files", default=False,
help=_("overwrite files which may exist"))
def _extract(self, destination, overwrite):
try:
self._extract_manifest(destination, overwrite)
except EnvironmentError as e:
# IOError/OSError base class
if e.errno == errno.EEXIST:
# useful error for file already exists
print(_('File "%s" exists. Use -f to force overwriting the file.') % e.filename)
else:
# generic error for everything else
print(_("Manifest could not be written:"))
print(e.strerror)
if e.filename:
print(e.filename)
return False
return True
def _do_command(self):
"""
Does the work that this command intends.
"""
if self.options.destination:
if self._extract(self.options.destination, self.options.overwrite_files):
print(_("The manifest has been dumped to the %s directory") % self.options.destination)
else:
if self._extract(os.getcwd(), self.options.overwrite_files):
print(_("The manifest has been dumped to the current directory"))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment