Created
February 22, 2018 23:52
-
-
Save sideangleside/d1ce75e0a776073868d9e31793cd6920 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import print_function, division, absolute_import | |
# | |
# Copyright (c) 2010 - 2012 Red Hat, Inc. | |
# | |
# This software is licensed to you under the GNU General Public License, | |
# version 2 (GPLv2). There is NO WARRANTY for this software, express or | |
# implied, including the implied warranties of MERCHANTABILITY or FITNESS | |
# FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2 | |
# along with this software; if not, see | |
# http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt. | |
# | |
# Red Hat trademarks are not licensed under GPLv2. No permission is | |
# granted to use or replicate Red Hat trademarks that are incorporated | |
# in this software or its documentation. | |
# | |
import errno | |
import os | |
import sys | |
from six.moves import cStringIO | |
from zipfile import ZipFile, BadZipfile | |
from rhsm import certificate | |
from rct.commands import RCTCliCommand | |
from rct.printing import xstr | |
from subscription_manager.cli import InvalidCLIOptionError | |
from rhsm import ourjson as json | |
from subscription_manager.i18n import ugettext as _ | |
def get_value(json_dict, path): | |
current = json_dict | |
for item in path.split("."): | |
if item in current: | |
current = current[item] | |
else: | |
return "" | |
return current | |
def colorize(msg): | |
#color = False | |
#if color: | |
# return "\033[94m%s\033[0m" % msg | |
#else: | |
return msg | |
class ZipExtractAll(ZipFile): | |
"""extend ZipFile with a safer extractall | |
Zipfile() does not support extractall on python2.4, and the 2.6 versions | |
are known to be unsafe in how they extract files. 2.6 version does not | |
validate that files are within the archive root, or check that files are | |
created safely. | |
Contains helper methods for manipulating and reading | |
the zipfile more easily in memory""" | |
inner_zip = None | |
def __init__(self, *args, **kwargs): | |
""" | |
Validates the zip file | |
""" | |
try: | |
ZipFile.__init__(self, *args, **kwargs) | |
except BadZipfile: | |
print(_("Manifest zip is invalid.")) | |
sys.exit(1) | |
def _get_inner_zip(self): | |
if self.inner_zip is None: | |
output = cStringIO(self.read(RCTManifestCommand.INNER_FILE)) | |
self.inner_zip = ZipExtractAll(output, 'r') | |
return self.inner_zip | |
def _read_file(self, file_path, is_inner=False): | |
try: | |
output = cStringIO(self.read(file_path)) | |
result = output.getvalue() | |
output.close() | |
except KeyError: | |
try: | |
if is_inner: | |
raise KeyError | |
result = self._get_inner_zip()._read_file(file_path, True) | |
except KeyError: | |
raise Exception(_('Unable to find file "%s" in manifest.') % file_path) | |
return result | |
def _get_entitlements(self): | |
results = [] | |
in_zip = self._get_inner_zip() | |
for filename in in_zip.namelist(): | |
(read_path, read_file) = os.path.split(filename) | |
if (read_path == os.path.join("export", "entitlements")) and (len(read_file) > 0): | |
results.append(filename) | |
return results | |
def _open_excl(self, path): | |
return os.fdopen(os.open(path, os.O_RDWR | os.O_CREAT | os.O_EXCL), 'w') | |
def _write_file(self, output_path, archive_path): | |
outfile = self._open_excl(output_path) | |
outfile.write(self.read(archive_path)) | |
outfile.close() | |
def _is_secure(self, base, new_file): | |
base_path = os.path.abspath(base) | |
new_path = os.path.abspath(new_file) | |
if not new_path.startswith(base_path): | |
raise Exception(_('Manifest zip attempted to extract outside of the base directory.')) | |
#traces symlink to source, and checks that it is valid | |
real_new_path = os.path.realpath(new_path) | |
if real_new_path != new_path: | |
self._is_secure(base, real_new_path) | |
elif os.path.islink(new_path): | |
raise Exception(_('Unable to trace symbolic link. Possibly circular linkage.')) | |
def extractall(self, location, overwrite=False): | |
self._is_secure(location, location) | |
for path_name in self.namelist(): | |
(directory, filename) = os.path.split(path_name) | |
directory = os.path.join(location, directory) | |
self._is_secure(location, directory) | |
if not os.path.exists(directory): | |
os.makedirs(directory) | |
new_location = os.path.join(directory, filename) | |
self._is_secure(location, new_location) | |
if (os.path.exists(new_location) and overwrite): | |
os.remove(new_location) | |
self._write_file(new_location, path_name) | |
class RCTManifestCommand(RCTCliCommand): | |
INNER_FILE = "consumer_export.zip" | |
def __init__(self, name="cli", aliases=None, shortdesc=None, primary=False): | |
RCTCliCommand.__init__(self, name=name, aliases=aliases, | |
shortdesc=shortdesc, primary=primary) | |
def _get_usage(self): | |
return _("%%prog %s [OPTIONS] MANIFEST_FILE") % self.name | |
def _validate_options(self): | |
manifest_file = self._get_file_from_args() | |
if not manifest_file: | |
raise InvalidCLIOptionError(_("You must specify a manifest file.")) | |
if not os.path.isfile(manifest_file): | |
raise InvalidCLIOptionError(_("The specified manifest file does not exist.")) | |
def _extract_manifest(self, location, overwrite=False): | |
# Extract the outer file | |
archive = ZipExtractAll(self._get_file_from_args(), 'r') | |
archive.extractall(location, overwrite) | |
# now extract the inner file | |
if location: | |
inner_file = os.path.join(location, self.INNER_FILE) | |
else: | |
inner_file = self.INNER_FILE | |
archive = ZipExtractAll(inner_file, 'r') | |
archive.extractall(location, overwrite) | |
# Delete the intermediate file | |
os.remove(inner_file) | |
class CatManifestCommand(RCTManifestCommand): | |
def __init__(self): | |
RCTManifestCommand.__init__(self, name="cat-manifest", aliases=['cm'], | |
shortdesc=_("Print manifest information"), | |
primary=True) | |
self.parser.add_option("--no-content", action="store_true", | |
default=False, | |
help=_("skip printing Content Sets")) | |
def _print_section(self, title, items, indent=1, whitespace=True): | |
# Allow a bit of customization of the tabbing | |
pad = "\t" * (indent - 1) | |
print(pad + title) | |
pad += "\t" | |
for item in items: | |
if len(item) == 2: | |
print("%s%s: %s" % (pad, item[0], xstr(item[1]))) | |
else: | |
print("%s%s" % (pad, item[0])) | |
if whitespace: | |
print("") | |
def _print_general(self, zip_archive): | |
# Print out general data | |
part = zip_archive._read_file(os.path.join("export", "meta.json")) | |
data = json.loads(part) | |
to_print = [] | |
to_print.append((_("Server"), get_value(data, "webAppPrefix"))) | |
to_print.append((_("Server Version"), get_value(data, "version"))) | |
to_print.append((_("Date Created"), get_value(data, "created"))) | |
to_print.append((_("Creator"), get_value(data, "principalName"))) | |
self._print_section(_("General:"), to_print) | |
def _print_consumer(self, zip_archive): | |
# Print out the consumer data | |
part = zip_archive._read_file(os.path.join("export", "consumer.json")) | |
data = json.loads(part) | |
to_print = [] | |
to_print.append((_("Name"), get_value(data, "name"))) | |
to_print.append((_("UUID"), get_value(data, "uuid"))) | |
# contentAccessMode is entitlement if null, blank or non-present | |
contentAccessMode = 'entitlement' | |
if "contentAccessMode" in data and data["contentAccessMode"] == 'org_environment': | |
contentAccessMode = 'org_environment' | |
to_print.append((_("Content Access Mode"), contentAccessMode)) | |
to_print.append((_("Type"), get_value(data, "type.label"))) | |
self._print_section(_("Consumer:"), to_print) | |
def _get_product_attribute(self, name, data): | |
return_value = None | |
for attr in get_value(data, "pool.productAttributes"): | |
if attr["name"] == name: | |
return_value = attr["value"] | |
break | |
return return_value | |
def _print_products(self, zip_archive): | |
entitlements = zip_archive._get_entitlements() | |
if len(entitlements) == 0: | |
self._print_section(_("Subscriptions:"), [["None"]], 1, True) | |
return | |
for ent_file in entitlements: | |
part = zip_archive._read_file(ent_file) | |
data = json.loads(part) | |
to_print = [] | |
to_print.append((_("Name"), get_value(data, "pool.productName"))) | |
to_print.append((_("Quantity"), get_value(data, "quantity"))) | |
to_print.append((_("Created"), get_value(data, "created"))) | |
to_print.append((_("Start Date"), get_value(data, "startDate"))) | |
to_print.append((_("End Date"), get_value(data, "endDate"))) | |
to_print.append((_("Service Level"), self._get_product_attribute("support_level", data))) | |
to_print.append((_("Service Type"), self._get_product_attribute("support_type", data))) | |
to_print.append((_("Architectures"), self._get_product_attribute("arch", data))) | |
to_print.append((_("SKU"), get_value(data, "pool.productId"))) | |
to_print.append((_("Contract"), get_value(data, "pool.contractNumber"))) | |
to_print.append((_("Order"), get_value(data, "pool.orderNumber"))) | |
to_print.append((_("Account"), get_value(data, "pool.accountNumber"))) | |
virt_limit = self._get_product_attribute("virt_limit", data) | |
to_print.append((_("Virt Limit"), virt_limit)) | |
facts_to_print = [] | |
sub_fact_data = "" | |
require_virt_who = False | |
if virt_limit: | |
require_virt_who = True | |
facts_to_print.append((_("VIRT_LIMIT"), "This subscription supports up to " + colorize(virt_limit) + " guests")) | |
else: | |
facts_to_print.append((_("VIRT_LIMIT"), "This subscription does not require virt-who")) | |
to_print.append((_("Requires Virt-who"), require_virt_who)) | |
### Sockets | |
sockets = self._get_product_attribute("sockets", data) | |
if sockets: | |
to_print.append((_("Sockets"), sockets)) | |
facts_to_print.append( | |
(_("SOCKETS"), "This subscription supports systems with up to " + colorize(sockets) + " sockets")) | |
else: | |
to_print.append((_("Sockets"), "Not Applicable")) | |
sockets = "0" | |
stacked = get_value(data, "pool.stacked") | |
to_print.append((_("Stackable"), stacked)) | |
if stacked: | |
facts_to_print.append((_("STACKABLE"), "This subscription is " + colorize( | |
"stackable") + " to support hosts with > " + colorize(sockets) + " sockets")) | |
else: | |
facts_to_print.append((_("STACKABLE"), "This subscription is not " + colorize("stackable") + " ")) | |
### Cores | |
cores = self._get_product_attribute("cores", data) | |
if cores: | |
to_print.append((_("Cores"), cores)) | |
facts_to_print.append( | |
(_("CORES"), "This subscription supports systems with up to " + colorize(cores) + " cores")) | |
else: | |
to_print.append((_("Cores"), "Not Applicable")) | |
### Inst Multiplier | |
instance_multiplier = self._get_product_attribute("instance_multiplier", data) | |
if instance_multiplier: | |
to_print.append((_("Instance Multipler"), instance_multiplier)) | |
else: | |
to_print.append((_("Instance Multipler"), "Not Applicable")) | |
### Cloud Access | |
cloud_access_enabled = self._get_product_attribute("cloud_access_enabled", data) | |
if cloud_access_enabled: | |
to_print.append((_("Cloud Access"), cloud_access_enabled.capitalize())) | |
facts_to_print.append((_("CLOUD ACCESS"), "This subscription is eligible for the " + colorize( | |
"Cloud Access Program") + "")) | |
else: | |
to_print.append((_("Cloud Access"), "Not Applicable")) | |
entitlement_file = os.path.join("export", "entitlements", "%s.json" % data["id"]) | |
to_print.append((_("Entitlement File"), entitlement_file)) | |
#Get the certificate to get the version | |
serial = data["certificates"][0]["serial"]["id"] | |
cert_file = os.path.join("export", "entitlement_certificates", "%s.pem" % serial) | |
to_print.append((_("Certificate File"), cert_file)) | |
try: | |
cert = certificate.create_from_pem(zip_archive._read_file(cert_file)) | |
except certificate.CertificateException as ce: | |
raise certificate.CertificateException( | |
_("Unable to read certificate file '%s': %s") % (cert_file, | |
ce)) | |
to_print.append((_("Certificate Version"), cert.version)) | |
self._print_section(_("Subscription:"), to_print, 1, False) | |
# Get the provided Products | |
to_print = [(int(pp["productId"]), pp["productName"]) for pp in data["pool"]["providedProducts"]] | |
self._print_section(_("Provided Products:"), sorted(to_print), 2, False) | |
# Get the derived provided Products (if available) | |
if "derivedProvidedProducts" in data["pool"]: | |
to_print = [(int(pp["productId"]), pp["productName"]) for pp in data["pool"]["derivedProvidedProducts"]] | |
self._print_section(_("Derived Products:"), sorted(to_print), 2, False) | |
self._print_section(_("Facts:"), facts_to_print, 2, False) | |
# Get the Content Sets | |
if not self.options.no_content: | |
for item in cert.content: | |
to_print.append((_("Name"), item.name)) | |
to_print.append((_("\tLabel"), item.label)) | |
to_print.append((_("\tURL"), item.url)) | |
self._print_section(_("Content Sets & Labels:"), to_print, 2, True) | |
else: # bz#1369577: print a blank line to separate subscriptions when --no-content in use | |
print("") | |
def _do_command(self): | |
""" | |
Does the work that this command intends. | |
""" | |
temp = ZipExtractAll(self._get_file_from_args(), 'r') | |
# Print out the header | |
print("\n+-------------------------------------------+") | |
print(_("\tManifest")) | |
print("+-------------------------------------------+\n") | |
self._print_general(temp) | |
self._print_consumer(temp) | |
self._print_products(temp) | |
class DumpManifestCommand(RCTManifestCommand): | |
def __init__(self): | |
RCTManifestCommand.__init__(self, name="dump-manifest", aliases=['dm'], | |
shortdesc=_("Dump the contents of a manifest"), | |
primary=True) | |
self.parser.add_option("--destination", dest="destination", | |
help=_("directory to extract the manifest to")) | |
self.parser.add_option("-f", "--force", action="store_true", | |
dest="overwrite_files", default=False, | |
help=_("overwrite files which may exist")) | |
def _extract(self, destination, overwrite): | |
try: | |
self._extract_manifest(destination, overwrite) | |
except EnvironmentError as e: | |
# IOError/OSError base class | |
if e.errno == errno.EEXIST: | |
# useful error for file already exists | |
print(_('File "%s" exists. Use -f to force overwriting the file.') % e.filename) | |
else: | |
# generic error for everything else | |
print(_("Manifest could not be written:")) | |
print(e.strerror) | |
if e.filename: | |
print(e.filename) | |
return False | |
return True | |
def _do_command(self): | |
""" | |
Does the work that this command intends. | |
""" | |
if self.options.destination: | |
if self._extract(self.options.destination, self.options.overwrite_files): | |
print(_("The manifest has been dumped to the %s directory") % self.options.destination) | |
else: | |
if self._extract(os.getcwd(), self.options.overwrite_files): | |
print(_("The manifest has been dumped to the current directory")) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment