Last active
June 30, 2020 04:26
-
-
Save WillyPillow/61ee5f48b7c5b7cc90c9fd2ec5c1b20d to your computer and use it in GitHub Desktop.
qvm-template PoC
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse | |
import datetime | |
import os | |
import subprocess | |
import sys | |
import tempfile | |
import qubesadmin | |
import rpm | |
PATH_PREFIX = '/var/lib/qubes/vm-templates' | |
TEMP_DIR = '/var/tmp' | |
PACKAGE_NAME_PREFIX = 'qubes-template-' | |
parser = argparse.ArgumentParser(description='Qubes Template Manager') | |
parser.add_argument('operation', type=str) | |
parser.add_argument('templates', nargs='+') | |
parser.add_argument('--nogpgcheck', action='store_true') | |
parser.add_argument('--allow-pv', action='store_true', | |
help='Allow setting virt_mode to pv in configuration file.') | |
# NOTE: Verifying RPMs this way is prone to TOCTOU. This is okay for local | |
# files, but may create problems if multiple instances of `qvm-template` are | |
# downloading the same file, so a lock is needed in that case. | |
def verify_rpm(path, nogpgcheck=False, transaction_set=None): | |
if transaction_set is None: | |
transaction_set = rpm.TransactionSet() | |
with open(path, 'rb') as f: | |
try: | |
hdr = transaction_set.hdrFromFdno(f) | |
if hdr[rpm.RPMTAG_SIGSIZE] is None \ | |
and hdr[rpm.RPMTAG_SIGPGP] is None \ | |
and hdr[rpm.RPMTAG_SIGGPG] is None: | |
return nogpgcheck | |
except rpm.error as e: | |
if str(e) == 'public key not trusted' \ | |
or str(e) == 'public key not available': | |
return nogpgcheck | |
return False | |
return True | |
def get_package_hdr(path, transaction_set=None): | |
if transaction_set is None: | |
transaction_set = rpm.TransactionSet() | |
with open(path, 'rb') as f: | |
hdr = transaction_set.hdrFromFdno(f) | |
return hdr | |
def extract_rpm(name, path, target): | |
with open(path, 'rb') as in_file: | |
rpm2cpio = subprocess.Popen(['rpm2cpio', path], stdout=subprocess.PIPE) | |
# `-D` is GNUism | |
cpio = subprocess.Popen([ | |
'cpio', | |
'-idm', | |
'-D', | |
target, | |
'.%s/%s/*' % (PATH_PREFIX, name) | |
], stdin=rpm2cpio.stdout, stdout=subprocess.DEVNULL) | |
return rpm2cpio.wait() == 0 and cpio.wait() == 0 | |
def parse_config(path): | |
with open(path, 'r') as f: | |
return dict(line.rstrip('\n').split('=', 1) for line in f) | |
def install(args, app): | |
transaction_set = rpm.TransactionSet() | |
for template in args.templates: | |
if not template.endswith('.rpm'): | |
parser.error('Only local RPMs are currently supported.') | |
if not os.path.exists(template): | |
parser.error('RPM file \'%s\' not found.' % template) | |
if not verify_rpm(template, args.nogpgcheck, transaction_set): | |
parser.error('Package \'%s\' verification failed.' % template) | |
for template in args.templates: | |
with tempfile.TemporaryDirectory(dir=TEMP_DIR) as target: | |
package_hdr = get_package_hdr(template) | |
package_name = package_hdr[rpm.RPMTAG_NAME] | |
if not package_name.startswith(PACKAGE_NAME_PREFIX): | |
parser.error( | |
'Illegal package name for package \'%s\'.' % template) | |
# Remove prefix to get the real template name | |
name = package_name[len(PACKAGE_NAME_PREFIX):] | |
extract_rpm(name, template, target) | |
cmdline = [ | |
'qvm-template-postprocess', | |
'--keep-source', | |
'--really', | |
'--no-installed-by-rpm', | |
] | |
if args.allow_pv: | |
cmdline.append('--allow-pv') | |
subprocess.check_call(cmdline + [ | |
'post-install', | |
name, | |
target + PATH_PREFIX + '/' + name]) | |
tpl = app.domains[name] | |
tpl.features['template-version'] = \ | |
package_hdr[rpm.RPMTAG_VERSION] | |
tpl.features['template-release'] = \ | |
package_hdr[rpm.RPMTAG_RELEASE] | |
tpl.features['template-install-date'] = \ | |
str(datetime.datetime.today()) | |
tpl.features['template-name'] = name | |
def main(args=None, app=None): | |
args = parser.parse_args(args) | |
if app is None: | |
app = qubesadmin.Qubes() | |
if args.operation == 'install': | |
install(args, app) | |
else: | |
parser.error('Operation \'%s\' not supported.' % args.operation) | |
return 0 | |
if __name__ == '__main__': | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment