Skip to content

Instantly share code, notes, and snippets.

@mswart
Created October 25, 2017 09:14
Show Gist options
  • Save mswart/7630850790839230213df89e88171ad6 to your computer and use it in GitHub Desktop.
Save mswart/7630850790839230213df89e88171ad6 to your computer and use it in GitHub Desktop.
Filter aptly snapshot with libsolv to selected a consistent set of dependencies like apt would do
#!/usr/bin/python3
import argparse
import json
import os
import re
import signal
import subprocess
import sys
import time
from urllib.request import Request, urlopen
import solv
parser = argparse.ArgumentParser(description='Filter multiple aptly snapshot into on specific one')
parser.add_argument('--description',
default='filtered source snapshots',
help='free-format description how snapshot has been created')
parser.add_argument('--cache-dir',
help='cache dir with pre-executed aptly snapshot search runs')
parser.add_argument('dest',
help='name of the snapshot to create')
parser.add_argument('snapshots', metavar='SNAPSHOT', nargs='+',
help='Name of aptly source snapshot')
parser.add_argument('--install', action='append',
help='package that should be installed')
parser.add_argument('--aptly-config', default=None,
help='specific aptly configuration file')
parser.add_argument('--verbose', action='store_true',
help='More output about solving result')
def process_deps(str, pool, add_callback, limit_to=None):
deps_str = str.split(',')
package_names = []
for and_deps in deps_str:
if not and_deps.strip():
continue
or_deps_str = and_deps.split('|')
or_deps = None
for or_dep in or_deps_str:
dep = or_dep.strip()
if not dep:
continue
if '(' in dep:
name, version = dep[:-1].split('(')
else:
name = dep
version = None
name = name.strip()
if name.endswith(':any'):
package_names.append(name[:-4])
if limit_to is not None and name[:-4] not in limit_to:
continue
name = pool.str2id(name[:-4])
id = pool.rel2id(name, solv.ARCH_ANY, 25) # solv.REL_MULTIARCH
else:
package_names.append(name)
if limit_to is not None and name not in limit_to:
continue
id = pool.str2id(name)
if version:
op, ver = version.split(' ')
version_id = pool.str2id(ver)
if op == '>=':
flags = solv.REL_GT | solv.REL_EQ
elif op == '>>':
flags = solv.REL_GT
elif op == '<=':
flags = solv.REL_LT | solv.REL_EQ
elif op == '<<':
flags = solv.REL_LT
elif op == '=':
flags = solv.REL_EQ
else:
raise ValueError('operator {} not implemented'.format(op))
id = pool.rel2id(id, version_id, flags)
if or_deps:
or_deps = pool.rel2id(id, or_deps, solv.REL_OR)
else:
or_deps = id
if or_deps is not None:
add_callback(or_deps)
return package_names
def build_aptly_format():
def field(name):
return name + ': {{.' + name + '}}'
def nullfield(name):
return name + ': {{if .' + name + '}}{{.' + name + '}}{{else}} {{end}}'
fields = [
field('Package'),
field('Version'),
field('Architecture'),
field('Priority'),
nullfield('Depends'),
nullfield('PreDepends'),
nullfield('Provides'),
nullfield('Recommends'),
nullfield('Suggests'),
nullfield('Replaces'),
nullfield('Breaks'),
nullfield('Conflicts'),
nullfield('Enhances'),
field('Key'),
]
return '\n'.join(fields) + '\n'
def aptly(command, subcommand=None, *arguments, **options):
argv = ['aptly']
if args.aptly_config:
argv.append('-config=' + args.aptly_config)
argv.append(command)
if subcommand:
argv.append(subcommand)
for option, value in options.items():
argv.append('-{}={}'.format(option, value))
argv.extend(arguments)
return argv
def snapshot2repo(pool, name, pkg_list):
repo = pool.add_repo(name)
data = repo.add_repodata()
for pkg_desc in pkg_list.strip().split('\n\n'):
pkg = repo.add_solvable()
pkg.name, pkg.evr, pkg.arch, priority, \
depends, predepends, provides, recommends, suggests, \
replaces, breaks, conflics, enhances, \
aptlyid = re.sub(r'^.+: ', '', pkg_desc,
flags=re.MULTILINE).split('\n')
process_deps(predepends, pool,
lambda d: pkg.add_requires(d, solv.SOLVABLE_PREREQMARKER))
process_deps(recommends, pool, pkg.add_recommends)
process_deps(depends, pool,
lambda d: pkg.add_requires(d, -solv.SOLVABLE_PREREQMARKER))
process_deps(suggests, pool, pkg.add_suggests)
process_deps(provides, pool, pkg.add_provides)
process_deps(breaks, pool, pkg.add_conflicts)
conflict_pkgs = process_deps(conflics, pool, pkg.add_conflicts)
process_deps(enhances, pool, pkg.add_enhances)
# we ignore replaces that have no corresponding conflicts
# they are only relevant for dpkg (file dublication)
process_deps(replaces, pool, pkg.add_obsoletes, limit_to=conflict_pkgs)
pkg.add_provides(pool.rel2id(pkg.nameid, pkg.evrid, solv.REL_EQ,
create=True))
data.set_str(pkg.id, solv.SOLVABLE_PKGID, aptlyid)
data.set_str(pkg.id, solv.SOLVABLE_PATCHCATEGORY, priority)
data.internalize()
return repo
args = parser.parse_args()
pool = solv.Pool()
pool.setarch('amd64')
repos = []
for snapshot in args.snapshots:
if args.cache_dir and os.path.isfile(os.path.join(args.cache_dir, snapshot)):
with open(os.path.join(args.cache_dir, snapshot), 'r') as cache_file:
pkg_list = cache_file.read()
else:
pkg_list = subprocess.run(
aptly('snapshot', 'search',
snapshot, '$PackageType (% deb)',
format=build_aptly_format()),
check=True,
stdout=subprocess.PIPE).stdout
repos.append(snapshot2repo(pool, snapshot, pkg_list.decode('utf-8')))
pool.createwhatprovides()
sel = pool.Selection()
for install in args.install:
di = pool.Dataiterator(solv.SOLVABLE_NAME, install,
solv.Dataiterator.SEARCH_STRING)
for d in di:
sel.add_raw(solv.Job.SOLVER_SOLVABLE, d.solvid)
di = pool.Dataiterator(solv.SOLVABLE_PATCHCATEGORY, 'required',
solv.Dataiterator.SEARCH_STRING)
for d in di:
sel.add_raw(solv.Job.SOLVER_SOLVABLE, d.solvid)
di = pool.Dataiterator(solv.SOLVABLE_PATCHCATEGORY, 'important',
solv.Dataiterator.SEARCH_STRING)
for d in di:
sel.add_raw(solv.Job.SOLVER_SOLVABLE, d.solvid)
jobs = sel.jobs(solv.Job.SOLVER_INSTALL)
solver = pool.Solver()
while True:
problems = solver.solve(jobs)
if not problems:
break
for problem in problems:
print("Problem %d/%d:" % (problem.id, len(problems)))
print(problem)
solutions = problem.solutions()
for solution in solutions:
print(" Solution %d:" % solution.id)
elements = solution.elements(True)
for element in elements:
print(" - %s" % element.str())
print('')
print('"the"? problem rule:')
print(problem.findproblemrule().info().problemstr())
print('All problem rules:')
for rule in problem.findallproblemrules():
print('- ' + rule.info().problemstr())
exit(3)
# no problems, show transaction
trans = solver.transaction()
del solver
if trans.isempty():
print("Nothing to do.")
sys.exit(0)
print('')
print("Transaction summary:")
print('')
packages = []
for cl in trans.classify(solv.Transaction.SOLVER_TRANSACTION_SHOW_OBSOLETES |
solv.Transaction.SOLVER_TRANSACTION_OBSOLETE_IS_UPGRADE):
if not args.verbose:
pass
elif cl.type == solv.Transaction.SOLVER_TRANSACTION_ERASE:
print("%d erased packages:" % cl.count)
elif cl.type == solv.Transaction.SOLVER_TRANSACTION_INSTALL:
print("%d installed packages:" % cl.count)
elif cl.type == solv.Transaction.SOLVER_TRANSACTION_REINSTALLED:
print("%d reinstalled packages:" % cl.count)
elif cl.type == solv.Transaction.SOLVER_TRANSACTION_DOWNGRADED:
print("%d downgraded packages:" % cl.count)
elif cl.type == solv.Transaction.SOLVER_TRANSACTION_CHANGED:
print("%d changed packages:" % cl.count)
elif cl.type == solv.Transaction.SOLVER_TRANSACTION_UPGRADED:
print("%d upgraded packages:" % cl.count)
elif cl.type == solv.Transaction.SOLVER_TRANSACTION_VENDORCHANGE:
print("%d vendor changes from '%s' to '%s':" % (cl.count, cl.fromstr, cl.tostr))
elif cl.type == solv.Transaction.SOLVER_TRANSACTION_ARCHCHANGE:
print("%d arch changes from '%s' to '%s':" % (cl.count, cl.fromstr, cl.tostr))
else:
continue
for p in cl.solvables():
packages.append(p.lookup_str(solv.SOLVABLE_PKGID))
continue
if cl.type == solv.Transaction.SOLVER_TRANSACTION_UPGRADED \
or cl.type == solv.Transaction.SOLVER_TRANSACTION_DOWNGRADED:
op = trans.othersolvable(p)
print(" - %s -> %s" % (p, op))
else:
print(" - %s" % p)
print('')
print("install size change: %d K" % trans.calc_installsizechange())
print('')
payload = {
'Name': args.dest,
'Description': args.description,
'SourceSnapshots': args.snapshots,
'PackageRefs': packages
}
payload = json.dumps(payload, indent=4)
# launch aptly api
try:
proc = subprocess.Popen(aptly('api', 'serve', listen='127.0.0.1:30959'))
time.sleep(0.5)
# create new snapshot
request = Request(
url='http://127.0.0.1:30959/api/snapshots',
data=payload.encode('utf-8'),
headers={'Content-Type': 'application/json'})
response = urlopen(request)
print(response.status, response.reason)
finally:
proc.send_signal(signal.SIGTERM)
proc.wait()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment