Skip to content

Instantly share code, notes, and snippets.

@srid
Created January 8, 2010 19:40
Show Gist options
  • Save srid/272331 to your computer and use it in GitHub Desktop.
Save srid/272331 to your computer and use it in GitHub Desktop.
dependency.py - r940
# Copyright (c) 2009 ActiveState Software Inc.
# See http://www.activestate.com/activepython/license/ for licensing
# information.
"""
pypm.client.dependency
~~~~~~~~~~~~~~~~~~~~~~
This module contains package dependency related functionality
"""
import operator
from textwrap import dedent
import pkg_resources
import logging
from pypm.common.repository import RepoPackage
from pypm.client.base import PyPMFeature
from pypm.client import error
from pypm.client.store import InstalledPackage
from pypm.client.version import sort_packages_by_version
LOG = logging.getLogger(__name__)
#
# Package selectors
#
class BasePackageSelector(PyPMFeature):
"""Select packages based on pre-defined rules
"""
def __init__(self, *args, **kwargs):
super(BasePackageSelector, self).__init__(*args, **kwargs)
self.selected = {} # name => pkg
def select(self, pkg):
rules = []
# find rules
for method_name in dir(self):
if method_name.startswith('rule_'):
method = getattr(self, method_name)
rules.append(method)
# apply rules
blocked_by_rule = any([not rule(pkg) for rule in rules])
if not blocked_by_rule and not self.is_selected(pkg.canonical_name):
self.selected[pkg.canonical_name] = pkg
def is_selected(self, pkg_canonical_name):
return pkg_canonical_name in self.selected
def get_selected(self):
return self.selected.values()
def rule_no_version_conflict(self, pkg):
"""Cannot select packages with conflicting versions"""
if pkg.canonical_name in self.selected:
if pkg.version != self.selected[pkg.canonical_name].version:
raise ValueError, 'conflicting versions detected'
return True
class InstallablePackageSelector(BasePackageSelector):
"""Select only packages that can be installed
"""
def is_selected_or_installed(self, pkg):
"""Return True if the given package is either selected or same version
of it already installed
If a different version of the same package is already selected, we have
conflicting version requirements that cannot be reliably satisfied.
"""
if pkg.canonical_name in self.selected:
selected_ver = self.selected[pkg.canonical_name].version
if pkg.version != selected_ver:
raise ValueError, ('{0}-{1} needs to be selected, but a different ' +
'version ({2}) is already selected').format(
pkg.canonical_name, pkg.version, selected_ver)
return True
try:
ipkg = self.pypmenv.installed_store.find_only_package(pkg.canonical_name)
return pkg.version == ipkg.version # this would result in an upgrade
except error.NoPackageInstalled:
return False
def rule_same_version_not_installed(self, pkg):
try:
installed_package = self.pypmenv.installed_store.find_only_package(
pkg.canonical_name)
return not (installed_package.version == pkg.version and
installed_package.pkg_version == pkg.version)
except error.NoPackageInstalled:
return True
class RemovablePackageSelector(BasePackageSelector):
"""Select packages that can be removed
"""
pass
#
# Dependency management
#
class Dependency(PyPMFeature):
def get_available_dependencies(self, package, selector=None):
"""Return dependencies for package - that are not yet installed. Do this
recursively.
"""
LOG.debug('Finding dependencies for {0.full_name}'.format(package))
assert type(package) is RepoPackage
if not selector:
selector = InstallablePackageSelector(self.pypmenv)
# For each requirement, try to satisfy them.
# If an requirement cannot be satisfied, then installation must fail.
for requirement in _get_dependencies(package):
packages = list(self.pypmenv.repo_store.find_package_releases(
requirement.project_name))
# 1. sort by package version (see ``Package.__cmp__``) in descending order
# so that we favor the latest version of the package possible
sort_packages_by_version(packages)
# 2. pick that version only if the requirement rule matches (eg: Foo <= 2.9)
for pkg in packages:
if pkg.version in requirement:
# skip requirements that are already satisfied
# this will also avoid infinite recursion
if not selector.is_selected_or_installed(pkg):
# Select this package (an requirement) *first* before
# processing its dependencies .. thus avoiding potential
# infinite loop.
selector.select(pkg)
# Process dependencies recursively
self.get_available_dependencies(pkg, selector)
break
else:
raise error.DependencyNotFound(package.name, requirement)
return selector
def get_removable_dependents(self, installed_package):
"""Return already installed packages that, directly or indirectly, depend
upon `package`.
"""
assert type(installed_package) is InstalledPackage
selector = RemovablePackageSelector(self.pypmenv)
RDependsGraph(self.pypmenv).flatten_from(
installed_package.canonical_name, selector)
return selector
#
# Dependency graph
#
class RDependsGraph(PyPMFeature):
"""A reverse dependency graph (DAG)
NOTE: if a package X does not have any other installed package depending on
it, then it won't be part of the graph datastructure (``self.graph``). This
effectively means that X is a hanging node with no relations.
"""
def __init__(self, *a, **kw):
super(RDependsGraph, self).__init__(*a, **kw)
self._calculate()
def _calculate(self):
"""Build the graph"""
depends_graph = {}
for pkg in self.pypmenv.installed_store.find_all_packages():
assert pkg.canonical_name not in depends_graph
depends_graph[pkg.canonical_name] = [
req.project_name.lower()
for req in _get_dependencies(pkg)]
self.graph = RDependsGraph._invert_graph(depends_graph)
@staticmethod
def _invert_graph(graph):
inverted_graph = {}
for node in graph:
for dnode in graph[node]:
inverted_graph.setdefault(dnode, []).append(node)
return inverted_graph
def flatten_from(self, from_pkg_name, selector, _stack=[]):
"""Flatten the sub-tree from node ``from_pkg_name`` and select the
items, as instances of ``InstalledPackage``, of the flattened list to
``selector``.
"""
if from_pkg_name not in self.graph:
# no rdepends (i.e., no installed packages depend on `from_pkg_name`)
return selector
if selector.is_selected(from_pkg_name):
return # already processed
LOG.debug('** flatten reverse dependents:\n %s // %s',
'->'.join(_stack),
from_pkg_name)
for dep_pkg_name in self.graph[from_pkg_name]:
if dep_pkg_name in _stack:
continue # circular dependency detected
self.flatten_from(dep_pkg_name, selector, _stack+[from_pkg_name])
selector.select(
self.pypmenv.installed_store.find_only_package(dep_pkg_name))
def _get_dependencies(pkg, reqspec=None):
"""Get the dependencies of ``pkg``
TODO: If reqspec is specfied, use it
"""
if reqspec:
raise NotImplementedError, 'need to support Foo[bar]'
return [pkg_resources.Requirement.parse(reqstring)
for reqstring in pkg.install_requires['']]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment