Created
January 8, 2010 19:40
-
-
Save srid/272331 to your computer and use it in GitHub Desktop.
dependency.py - r940
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Copyright (c) 2009 ActiveState Software Inc. | |
# See http://www.activestate.com/activepython/license/ for licensing | |
# information. | |
""" | |
pypm.client.dependency | |
~~~~~~~~~~~~~~~~~~~~~~ | |
This module contains package dependency related functionality | |
""" | |
import operator | |
from textwrap import dedent | |
import pkg_resources | |
import logging | |
from pypm.common.repository import RepoPackage | |
from pypm.client.base import PyPMFeature | |
from pypm.client import error | |
from pypm.client.store import InstalledPackage | |
from pypm.client.version import sort_packages_by_version | |
LOG = logging.getLogger(__name__) | |
# | |
# Package selectors | |
# | |
class BasePackageSelector(PyPMFeature): | |
"""Select packages based on pre-defined rules | |
""" | |
def __init__(self, *args, **kwargs): | |
super(BasePackageSelector, self).__init__(*args, **kwargs) | |
self.selected = {} # name => pkg | |
def select(self, pkg): | |
rules = [] | |
# find rules | |
for method_name in dir(self): | |
if method_name.startswith('rule_'): | |
method = getattr(self, method_name) | |
rules.append(method) | |
# apply rules | |
blocked_by_rule = any([not rule(pkg) for rule in rules]) | |
if not blocked_by_rule and not self.is_selected(pkg.canonical_name): | |
self.selected[pkg.canonical_name] = pkg | |
def is_selected(self, pkg_canonical_name): | |
return pkg_canonical_name in self.selected | |
def get_selected(self): | |
return self.selected.values() | |
def rule_no_version_conflict(self, pkg): | |
"""Cannot select packages with conflicting versions""" | |
if pkg.canonical_name in self.selected: | |
if pkg.version != self.selected[pkg.canonical_name].version: | |
raise ValueError, 'conflicting versions detected' | |
return True | |
class InstallablePackageSelector(BasePackageSelector): | |
"""Select only packages that can be installed | |
""" | |
def is_selected_or_installed(self, pkg): | |
"""Return True if the given package is either selected or same version | |
of it already installed | |
If a different version of the same package is already selected, we have | |
conflicting version requirements that cannot be reliably satisfied. | |
""" | |
if pkg.canonical_name in self.selected: | |
selected_ver = self.selected[pkg.canonical_name].version | |
if pkg.version != selected_ver: | |
raise ValueError, ('{0}-{1} needs to be selected, but a different ' + | |
'version ({2}) is already selected').format( | |
pkg.canonical_name, pkg.version, selected_ver) | |
return True | |
try: | |
ipkg = self.pypmenv.installed_store.find_only_package(pkg.canonical_name) | |
return pkg.version == ipkg.version # this would result in an upgrade | |
except error.NoPackageInstalled: | |
return False | |
def rule_same_version_not_installed(self, pkg): | |
try: | |
installed_package = self.pypmenv.installed_store.find_only_package( | |
pkg.canonical_name) | |
return not (installed_package.version == pkg.version and | |
installed_package.pkg_version == pkg.version) | |
except error.NoPackageInstalled: | |
return True | |
class RemovablePackageSelector(BasePackageSelector): | |
"""Select packages that can be removed | |
""" | |
pass | |
# | |
# Dependency management | |
# | |
class Dependency(PyPMFeature): | |
def get_available_dependencies(self, package, selector=None): | |
"""Return dependencies for package - that are not yet installed. Do this | |
recursively. | |
""" | |
LOG.debug('Finding dependencies for {0.full_name}'.format(package)) | |
assert type(package) is RepoPackage | |
if not selector: | |
selector = InstallablePackageSelector(self.pypmenv) | |
# For each requirement, try to satisfy them. | |
# If an requirement cannot be satisfied, then installation must fail. | |
for requirement in _get_dependencies(package): | |
packages = list(self.pypmenv.repo_store.find_package_releases( | |
requirement.project_name)) | |
# 1. sort by package version (see ``Package.__cmp__``) in descending order | |
# so that we favor the latest version of the package possible | |
sort_packages_by_version(packages) | |
# 2. pick that version only if the requirement rule matches (eg: Foo <= 2.9) | |
for pkg in packages: | |
if pkg.version in requirement: | |
# skip requirements that are already satisfied | |
# this will also avoid infinite recursion | |
if not selector.is_selected_or_installed(pkg): | |
# Select this package (an requirement) *first* before | |
# processing its dependencies .. thus avoiding potential | |
# infinite loop. | |
selector.select(pkg) | |
# Process dependencies recursively | |
self.get_available_dependencies(pkg, selector) | |
break | |
else: | |
raise error.DependencyNotFound(package.name, requirement) | |
return selector | |
def get_removable_dependents(self, installed_package): | |
"""Return already installed packages that, directly or indirectly, depend | |
upon `package`. | |
""" | |
assert type(installed_package) is InstalledPackage | |
selector = RemovablePackageSelector(self.pypmenv) | |
RDependsGraph(self.pypmenv).flatten_from( | |
installed_package.canonical_name, selector) | |
return selector | |
# | |
# Dependency graph | |
# | |
class RDependsGraph(PyPMFeature): | |
"""A reverse dependency graph (DAG) | |
NOTE: if a package X does not have any other installed package depending on | |
it, then it won't be part of the graph datastructure (``self.graph``). This | |
effectively means that X is a hanging node with no relations. | |
""" | |
def __init__(self, *a, **kw): | |
super(RDependsGraph, self).__init__(*a, **kw) | |
self._calculate() | |
def _calculate(self): | |
"""Build the graph""" | |
depends_graph = {} | |
for pkg in self.pypmenv.installed_store.find_all_packages(): | |
assert pkg.canonical_name not in depends_graph | |
depends_graph[pkg.canonical_name] = [ | |
req.project_name.lower() | |
for req in _get_dependencies(pkg)] | |
self.graph = RDependsGraph._invert_graph(depends_graph) | |
@staticmethod | |
def _invert_graph(graph): | |
inverted_graph = {} | |
for node in graph: | |
for dnode in graph[node]: | |
inverted_graph.setdefault(dnode, []).append(node) | |
return inverted_graph | |
def flatten_from(self, from_pkg_name, selector, _stack=[]): | |
"""Flatten the sub-tree from node ``from_pkg_name`` and select the | |
items, as instances of ``InstalledPackage``, of the flattened list to | |
``selector``. | |
""" | |
if from_pkg_name not in self.graph: | |
# no rdepends (i.e., no installed packages depend on `from_pkg_name`) | |
return selector | |
if selector.is_selected(from_pkg_name): | |
return # already processed | |
LOG.debug('** flatten reverse dependents:\n %s // %s', | |
'->'.join(_stack), | |
from_pkg_name) | |
for dep_pkg_name in self.graph[from_pkg_name]: | |
if dep_pkg_name in _stack: | |
continue # circular dependency detected | |
self.flatten_from(dep_pkg_name, selector, _stack+[from_pkg_name]) | |
selector.select( | |
self.pypmenv.installed_store.find_only_package(dep_pkg_name)) | |
def _get_dependencies(pkg, reqspec=None): | |
"""Get the dependencies of ``pkg`` | |
TODO: If reqspec is specfied, use it | |
""" | |
if reqspec: | |
raise NotImplementedError, 'need to support Foo[bar]' | |
return [pkg_resources.Requirement.parse(reqstring) | |
for reqstring in pkg.install_requires['']] | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment