Skip to content

Instantly share code, notes, and snippets.

@haraldh
Created January 20, 2016 16:43
Show Gist options
  • Save haraldh/010e1d4289904dd09b83 to your computer and use it in GitHub Desktop.
Save haraldh/010e1d4289904dd09b83 to your computer and use it in GitHub Desktop.
#### modified repoquery.py for graphviz .dot generation
# replace /usr/lib/python3.5/site-packages/dnf-plugins/repoquery.py
#
# example:
# $ dnf -q --disablerepo=* --enablerepo=rawhide repoquery \
# --arch=noarch,x86_64 --requires --recursive bash systemd kernel dracut bash
#
#
# Copyright (C) 2014 Red Hat, Inc.
#
# This copyrighted material is made available to anyone wishing to use,
# modify, copy, or redistribute it subject to the terms and conditions of
# the GNU General Public License v.2, or (at your option) any later version.
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY expressed or implied, including the implied warranties of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details. You should have received a copy of the
# GNU General Public License along with this program; if not, write to the
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA. Any Red Hat trademarks that are incorporated in the
# source code or documentation are not subject to the GNU General Public
# License and may only be used or replicated with the express permission of
# Red Hat, Inc.
#
from __future__ import absolute_import
from __future__ import unicode_literals
from datetime import datetime
from dnfpluginscore import _
import argparse
import dnf
import dnf.cli
import dnf.exceptions
import dnf.subject
import dnfpluginscore
import functools
import hawkey
import re
import textwrap
import networkx as nx
QFORMAT_DEFAULT = '%{name}-%{epoch}:%{version}-%{release}.%{arch}'
# matches %[-][dd]{attr}
QFORMAT_MATCH = re.compile(r'%([-\d]*?){([:\.\w]*?)}')
QUERY_INFO = """\
Name : {0.name}
Version : {0.version}
Release : {0.release}
Architecture: {0.arch}
Size : {0.size}
License : {0.license}
Source RPM : {0.sourcerpm}
Build Date : {0.buildtime}
Packager : {0.packager}
URL : {0.url}
Summary : {0.summary}
Description :
{0.description}\n"""
QUERY_TAGS = """
name, arch, epoch, version, release, reponame (repoid), evr
installtime, buildtime, size, downloadsize, installsize
provides, requires, obsoletes, conflicts, sourcerpm
description, summary, license, url
"""
def build_format_fn(opts):
if opts.queryinfo:
return info_format
elif opts.queryfilelist:
return filelist_format
elif opts.querysourcerpm:
return sourcerpm_format
else:
return rpm2py_format(opts.queryformat).format
def info_format(pkg):
return QUERY_INFO.format(pkg)
def filelist_format(pkg):
return pkg.files
def sourcerpm_format(pkg):
return pkg.sourcerpm
def parse_arguments(args):
# Setup ArgumentParser to handle util
parser = dnfpluginscore.ArgumentParser(RepoQueryCommand.aliases[0])
parser.add_argument('key', nargs='*',
help=_('the key to search for'))
parser.add_argument('--repo', metavar='REPO', action='append',
help=_('show only results from this REPO'))
# make --repoid hidden compatibility alias for --repo
parser.add_argument('--repoid', dest='repo', action='append',
help=argparse.SUPPRESS)
parser.add_argument('--arch', metavar='ARCH',
help=_('show only results from this ARCH'))
parser.add_argument('-f', '--file', metavar='FILE',
help=_('show only results that owns FILE'))
parser.add_argument('--whatprovides', metavar='REQ',
help=_('show only results there provides REQ'))
parser.add_argument('--whatrequires', metavar='REQ',
help=_('show only results there require REQ'))
parser.add_argument('--whatrecommends', metavar='REQ',
help=_('show only results that recommend REQ'))
parser.add_argument('--whatenhances', metavar='REQ',
help=_('show only results that enhance REQ'))
parser.add_argument('--whatsuggests', metavar='REQ',
help=_('show only results that suggest REQ'))
parser.add_argument('--whatsupplements', metavar='REQ',
help=_('show only results that supplement REQ'))
parser.add_argument("--alldeps", action="store_true",
help="shows results that requires package provides and files")
parser.add_argument('--querytags', action='store_true',
help=_('show available tags to use with '
'--queryformat'))
parser.add_argument('--resolve', action='store_true',
help=_('resolve capabilities to originating package(s)'))
parser.add_argument('--recursive', action='store_true',
help=_('recursively resolve capabilities to originating package(s)'))
parser.add_argument("--tree", action="store_true",
help=_('show recursive tree for package(s)'))
parser.add_argument('--srpm', action='store_true',
help=_('operate on corresponding source RPM'))
outform = parser.add_mutually_exclusive_group()
outform.add_argument('-i', "--info", dest='queryinfo',
default=False, action='store_true',
help=_('show detailed information about the package'))
outform.add_argument('-l', "--list", dest='queryfilelist',
default=False, action='store_true',
help=_('show list of files in the package'))
outform.add_argument('-s', "--source", dest='querysourcerpm',
default=False, action='store_true',
help=_('show package source RPM name'))
outform.add_argument('--qf', "--queryformat", dest='queryformat',
default=QFORMAT_DEFAULT,
help=_('format for displaying found packages'))
outform.add_argument("--latest-limit", dest='latest_limit', type=int,
help=_('show N latest packages for a given name.arch'
' (or latest but N if N is negative)'))
pkgfilter = parser.add_mutually_exclusive_group()
pkgfilter.add_argument("--duplicated", dest='pkgfilter',
const='duplicated', action='store_const',
help=_('limit the query to installed duplicated packages'))
pkgfilter.add_argument("--installonly", dest='pkgfilter',
const='installonly', action='store_const',
help=_('limit the query to installed installonly packages'))
pkgfilter.add_argument("--unsatisfied", dest='pkgfilter',
const='unsatisfied', action='store_const',
help=_('limit the query to installed packages with unsatisfied dependencies'))
package_atribute = parser.add_mutually_exclusive_group()
help_msgs = {
'conflicts': _('Display capabilities that the package conflicts with.'),
'enhances': _('Display capabilities that the package can enhance.'),
'obsoletes': _('Display capabilities that the package obsoletes.'),
'provides': _('Display capabilities provided by the package.'),
'recommends': _('Display capabilities that the package recommends.'),
'requires': _('Display capabilities that the package depends on.'),
'suggests': _('Display capabilities that the package suggests.'),
'supplements': _('Display capabilities that the package can supplement.')
}
for arg in ('conflicts', 'enhances', 'obsoletes', 'provides', 'recommends',
'requires', 'suggests', 'supplements'):
name = '--%s' % arg
package_atribute.add_argument(name, dest='packageatr', action='store_const',
const=arg, help=help_msgs[arg])
help_list = {
'available': _('Display only available packages.'),
'installed': _('Display only installed packages.'),
'extras': _('Display only packages that are not present in any of available repositories.'),
'upgrades': _('Display only packages that provide an upgrade for some already installed package.'),
'autoremove': _('Display only packages that can be autoremoved.'),
'recent': _('Display only recently edited packages')
}
list_group = parser.add_mutually_exclusive_group()
for list_arg in ('available', 'installed', 'extras', 'upgrades', 'autoremove',
'recent'):
switch = '--%s' % list_arg
list_group.add_argument(switch, dest='list', action='store_const',
const=list_arg, help=help_list[list_arg])
return parser.parse_args(args), parser
def rpm2py_format(queryformat):
"""Convert a rpm like QUERYFMT to an python .format() string."""
def fmt_repl(matchobj):
fill = matchobj.groups()[0]
key = matchobj.groups()[1]
if fill:
if fill[0] == '-':
fill = '>' + fill[1:]
else:
fill = '<' + fill
fill = ':' + fill
return '{0.' + key.lower() + fill + "}"
queryformat = queryformat.replace("\\n", "\n")
queryformat = queryformat.replace("\\t", "\t")
fmt = re.sub(QFORMAT_MATCH, fmt_repl, queryformat)
return fmt
class RepoQuery(dnf.Plugin):
name = 'Query'
def __init__(self, base, cli):
super(RepoQuery, self).__init__(base, cli)
self.base = base
self.cli = cli
if self.cli is not None:
self.cli.register_command(RepoQueryCommand)
class RepoQueryCommand(dnf.cli.Command):
"""The util command there is extending the dnf command line."""
aliases = ('repoquery',)
summary = _('search for packages matching keyword')
usage = _('[OPTIONS] [KEYWORDS]')
@staticmethod
def by_dep(sack, pattern, query, dep):
try:
reldep = hawkey.Reldep(sack, pattern)
except hawkey.ValueException:
return query.filter(empty=True)
kwarg = {dep: reldep}
return query.filter(**kwarg)
@staticmethod
def filter_repo_arch(opts, query):
"""Filter query by repoid and arch options"""
if opts.repo:
query = query.filter(reponame=opts.repo)
if opts.arch:
archs = [arch.strip() for arch in opts.arch.split(",")]
query = query.filter(arch=archs)
return query
def configure(self, args):
(self.opts, self.parser) = parse_arguments(args)
demands = self.cli.demands
if self.opts.help_cmd or self.opts.querytags:
return
if self.opts.srpm:
dnfpluginscore.lib.enable_source_repos(self.base.repos)
if self.opts.pkgfilter != "installonly" and \
self.opts.list != "installed":
demands.available_repos = True
demands.sack_activation = True
def by_all_deps(self, name, query):
defaultquery = query.filter(name=name)
allpkgs = set()
requiresquery = query.filter(requires__glob=name)
for reqpkg in requiresquery.run():
allpkgs.add(reqpkg)
for pkg in defaultquery.run():
for provide in pkg.provides:
providequery = query.filter(requires=provide)
for needsprovidepkg in providequery.run():
allpkgs.add(needsprovidepkg)
alldepsquery = query.filter(pkg=allpkgs)
return alldepsquery
def installonly(self, q):
installonly = q.installed().filter(
provides__glob=self.base.conf.installonlypkgs)
return installonly
def run(self, args):
if self.opts.help_cmd:
print(self.parser.format_help())
return
if self.opts.querytags:
print(_('Available query-tags: use --queryformat ".. %{tag} .."'))
print(QUERY_TAGS)
return
q = self.base.sack.query()
if self.opts.key:
pkgs = []
for key in self.opts.key:
q = dnf.subject.Subject(key, ignore_case=True).get_best_query(
self.base.sack, with_provides=False)
pkgs += q.run()
q = self.base.sack.query().filter(pkg=pkgs)
if self.opts.list == "recent":
q.recent(self.base.conf.recent)
elif self.opts.list == "autoremove":
q = q.unneeded(self.base.sack, self.base.yumdb)
elif self.opts.list:
q = getattr(q, self.opts.list)()
if self.opts.pkgfilter == "duplicated":
installonly = self.installonly(q)
exclude = [pkg.name for pkg in installonly]
q = q.filter(name__neq=exclude).duplicated()
elif self.opts.pkgfilter == "installonly":
q = self.installonly(q)
elif self.opts.pkgfilter == "unsatisfied":
rpmdb = dnf.sack.rpmdb_sack(self.base)
goal = dnf.goal.Goal(rpmdb)
solved = goal.run(verify=True)
if not solved:
for msg in goal.problems:
print(msg)
return
elif self.opts.pkgfilter == "unsatisfied":
# do not show packages from @System repo
q = q.available()
# filter repo and arch
q = self.filter_repo_arch(self.opts, q)
orquery = q
if self.opts.file:
q = q.filter(file=self.opts.file)
if self.opts.whatprovides:
q = q.filter(provides__glob=[self.opts.whatprovides])
if self.opts.alldeps:
if not self.opts.whatrequires:
raise dnf.exceptions.Error(
_("--alldeps requires --whatrequires option.\n"
"usage: dnf repoquery [--whatrequires] [key] [--alldeps]\n\n"))
q = self.by_all_deps(self.opts.whatrequires, q)
elif self.opts.whatrequires:
q = q.filter(requires__glob=self.opts.whatrequires)
if self.opts.whatrecommends:
q = self.by_dep(self.base.sack, self.opts.whatrecommends, q,
'recommends')
if self.opts.whatenhances:
q = self.by_dep(self.base.sack, self.opts.whatenhances, q,
'enhances')
if self.opts.whatsupplements:
q = self.by_dep(self.base.sack, self.opts.whatsupplements, q,
'supplements')
if self.opts.whatsuggests:
q = self.by_dep(self.base.sack, self.opts.whatsuggests, q,
'suggests')
if self.opts.latest_limit:
q = q.latest(self.opts.latest_limit)
if self.opts.srpm:
pkg_list = []
for pkg in q:
pkg = pkg.sourcerpm
if (pkg is not None):
tmp_query = self.base.sack.query().filter(nevra=pkg[:-4])
pkg_list += tmp_query.run()
q = self.base.sack.query().filter(pkg=pkg_list)
fmt_fn = build_format_fn(self.opts)
if self.opts.tree:
if not self.opts.whatrequires and not self.opts.packageatr:
raise dnf.exceptions.Error(
_("No switch specified\nusage: dnf repoquery [--whatrequires|"
"--requires|--conflicts|--obsoletes|--enhances|--suggest|"
"--provides|--suplements|--recommends] [key] [--tree]\n\n"
"description:\n For the given packages print a tree of the packages."))
self.tree_seed(q, orquery, self.opts)
return
if self.opts.recursive:
if not self.opts.whatrequires and not self.opts.packageatr:
raise dnf.exceptions.Error(
_("No switch specified\nusage: dnf repoquery [--whatrequires|"
"--requires|--conflicts|--obsoletes|--enhances|--suggest|"
"--provides|--suplements|--recommends] [key] [--tree]\n\n"
"description:\n For the given packages print a tree of the packages."))
DG=nx.DiGraph()
self.rec_seed(q, orquery, self.opts, DG)
# for i in ( "glibc", "bash", "systemd", "libgcc", "zlib", "libselinux" ):
# DG.remove_edges_from(DG.in_edges(i))
nx.write_dot(DG, "/dev/stdout")
return
pkgs = set()
if self.opts.packageatr:
for pkg in q.run():
rels = getattr(pkg, self.opts.packageatr)
for rel in rels:
pkgs.add(str(rel))
else:
for pkg in q.run():
po = PackageWrapper(pkg)
try:
pkgs.add(fmt_fn(po))
except AttributeError as e:
# catch that the user has specified attributes
# there don't exist on the dnf Package object.
raise dnf.exceptions.Error(str(e))
if self.opts.resolve:
# find the providing packages and show them
query = self.filter_repo_arch(
self.opts, self.base.sack.query().available())
providers = query.filter(provides__glob=list(pkgs))
pkgs = set()
for pkg in providers.latest().run():
po = PackageWrapper(pkg)
try:
pkgs.add(fmt_fn(po))
except AttributeError as e:
# catch that the user has specified attributes
# there don't exist on the dnf Package object.
raise dnf.exceptions.Error(str(e))
for pkg in sorted(pkgs):
print(pkg)
def grow_tree(self, level, pkg):
if level == -1:
print(pkg)
return
spacing = " "
for x in range(0, level):
spacing += "| "
requires = []
for reqirepkg in pkg.requires:
requires.append(str(reqirepkg))
reqstr = "[" + str(len(requires)) + ": " + ", ".join(requires) + "]"
print(spacing + "\_ " + str(pkg) + " " + reqstr)
def tree_seed(self, query, aquery, opts, level=-1, usedpkgs=None):
for pkg in sorted(set(query.run()), key=lambda p: p.name):
usedpkgs = set() if usedpkgs is None or level is -1 else usedpkgs
if pkg.name.startswith("rpmlib") or pkg.name.startswith("solvable"):
return
self.grow_tree(level, pkg)
if pkg not in usedpkgs:
usedpkgs.add(pkg)
if opts.packageatr:
strpkg = getattr(pkg, opts.packageatr)
ar = {}
for name in set(strpkg):
pkgquery = self.base.sack.query().filter(provides=name)
for querypkg in pkgquery:
ar[querypkg.name + "." + querypkg.arch] = querypkg
pkgquery = self.base.sack.query().filter(
pkg=list(ar.values()))
else:
pkgquery = self.by_all_deps(pkg.name, aquery) if opts.alldeps else aquery.filter(
requires__glob=pkg.name)
self.tree_seed(pkgquery, aquery, opts, level + 1, usedpkgs)
def rec_seed(self, query, aquery, opts, digraph, level=-1, usedpkgs=None, parent=None):
usedpkgs = set() if usedpkgs is None or level is -1 else usedpkgs
for pkg in sorted(set(query.run()), key=lambda p: p.name):
if pkg.name.startswith("rpmlib") or pkg.name.startswith("solvable"):
return
if parent:
digraph.add_node(pkg.name)
digraph.add_edge(parent.name, pkg.name)
if not pkg.installed and pkg not in usedpkgs:
usedpkgs.add(pkg)
digraph.add_node(pkg.name)
strpkg = getattr(pkg, "requires") + getattr(pkg, "recommends")
ar = list()
for name in set(strpkg):
if opts.arch:
pkgquery = self.base.sack.query().filter(provides=name, arch=[arch.strip() for arch in opts.arch.split(",")])
else:
pkgquery = self.base.sack.query().filter(provides=name)
ar.extend(pkgquery)
pkgquery = self.base.sack.query().filter(pkg=ar)
self.rec_seed(pkgquery, aquery, opts, digraph, level + 1, usedpkgs, parent=pkg)
class PackageWrapper(object):
"""Wrapper for dnf.package.Package, so we can control formatting."""
def __init__(self, pkg):
self._pkg = pkg
def __getattr__(self, attr):
atr = getattr(self._pkg, attr)
if isinstance(atr, list):
return '\n'.join(sorted([dnf.i18n.ucd(reldep) for reldep in atr]))
return dnf.i18n.ucd(atr)
@staticmethod
def _get_timestamp(timestamp):
if timestamp > 0:
dt = datetime.utcfromtimestamp(timestamp)
return dt.strftime("%Y-%m-%d %H:%M")
else:
return ''
@property
def buildtime(self):
return self._get_timestamp(self._pkg.buildtime)
@property
def installtime(self):
return self._get_timestamp(self._pkg.installtime)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment