Skip to content

Instantly share code, notes, and snippets.

@coordt
Created July 5, 2011 23:38
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save coordt/1066221 to your computer and use it in GitHub Desktop.
Save coordt/1066221 to your computer and use it in GitHub Desktop.
Show local, remote and current versions of installed packages using Fabric
from __future__ import with_statement
from fabric.api import env, run, settings, hide, local
from fabric.decorators import hosts, runs_once
import os
import pip
import sys, xmlrpclib
from cStringIO import StringIO
from distutils.version import StrictVersion, LooseVersion
def _find_current_version(package, index_urls=None):
"""
Using the XMLRPC method available for PyPI, get the most recent version
of <package> from each of the index_urls and figure out which one (if any)
is higher
Returns a tuple of the index with the higher version and the version it has
"""
if index_urls is None:
index_urls = ['http://pypi.python.org/pypi',]
cur_version = '0'
cur_index = ''
for index_url in index_urls:
pypi = xmlrpclib.ServerProxy(index_url, xmlrpclib.Transport())
pypi_hits = pypi.package_releases(package)
if len(pypi_hits) > 0:
if compare_versions(pypi_hits[0], cur_version) == 1:
cur_version = pypi_hits[0]
cur_index = index_url
return cur_index, cur_version
def compare_versions(version1, version2):
"""
Compare 2 versions, starting with StrictVersion, and falling back on
LooseVersion
"""
v1 = str(version1) or '0'
v2 = str(version2) or '0'
try:
return cmp(StrictVersion(v1), StrictVersion(v2))
except ValueError:
return cmp(LooseVersion(v1), LooseVersion(v2))
def _get_package_list():
"""
Get the list of currently installed packages and versions via pip freeze
"""
packages = {}
with settings(
hide('warnings', 'running', 'stdout', 'stderr'),
warn_only=True
):
result = run("%sbin/pip freeze -l" % env.venv)
for package in result.splitlines():
if package.startswith("-e"):
version, pkg = package.split("#egg=")
else:
pkg, version = package.split("==")
packages[pkg] = version
return packages
def _get_local_package_list():
"""
Call pip's freeze -l
returns a list of package_name, version tuples
"""
with hide('running'):
out = local('pip freeze -l -E %s' % os.environ['VIRTUAL_ENV'], capture=True)
packages = {}
for package in out.splitlines():
if package.startswith("-e"):
version, pkg = package.split("#egg=")
else:
pkg, version = package.split("==")
packages[pkg] = version
return packages
def _format_output(servers, special, summary, only_problems):
"""
given some records, format the table
"""
import curses
curses.setupterm()
CLEAR_SCREEN = curses.tigetstr('clear')
BOLD = curses.tigetstr("bold")
NORMAL = curses.tigetstr('sgr0')
headers = ['MOU', 'Package ']
first = True
pkgs = sorted(summary.keys())
for pkg in pkgs:
values = summary[pkg]
if first:
headers.extend("{0:>15}".format(srv) for srv in special[pkg].keys())
headers.extend("{0:>15}".format(srv) for srv in servers[pkg].keys())
header_string = " ".join(headers)
print "%s%s%s" % (CLEAR_SCREEN, BOLD, header_string)
print "%s%s" % ("=" * len(header_string), NORMAL)
first = False
if only_problems:
if (not values['Missing'] and
not values['Out-of-Sync'] and
not values['Updateable']):
continue
record = ["{0:1}{1:1}{2:1}".format(
values["Missing"] and "M" or " ",
values["Out-of-Sync"] and "O" or " ",
values["Updateable"] and "U" or " "),]
record.append("%s{0:15}%s".format(pkg[:15]) % (BOLD, NORMAL))
record.extend("{0:>15}".format(srv[:15]) for srv in special[pkg].values())
record.extend("{0:>15}".format(srv[:15]) for srv in servers[pkg].values())
print " ".join(record)
print ""
print "%sLegend" % BOLD
print "======%s" % NORMAL
print "%sM%s One or more servers are missing this package entirely" % (BOLD, NORMAL)
print "%sO%s The versions of the packages are out-of-sync across servers" % (BOLD, NORMAL)
print "%sU%s The version installed can be updated." % (BOLD, NORMAL)
@runs_once
def check_versions(index_urls=None, include_local=True, only_problems=True):
"""
Check the versions of all the packages on all the servers and print out
the out of sync packages
"""
package_versions = {}
package_servers = {}
current_versions = {}
package_special = {} # for local and current versions
if index_urls is None:
index_urls = ['http://pypi.python.org/pypi',]
for host in env.hosts:
with settings(host_string=host):
print "Getting packages on %s" % host
for pkg, version in _get_package_list().items():
if pkg not in package_versions:
package_versions[pkg] = {}
if version not in package_versions[pkg]:
package_versions[pkg][version] = []
package_versions[pkg][version].append(host)
if pkg not in package_servers:
package_servers[pkg] = dict(zip(env.hosts, ('',) * len(env.hosts)))
package_servers[pkg][host] = version
if include_local:
for pkg, version in _get_local_package_list().items():
if pkg not in package_versions:
package_versions[pkg] = {}
if version not in package_versions[pkg]:
package_versions[pkg][version] = []
package_versions[pkg][version].append('local')
if pkg not in package_servers:
package_servers[pkg] = dict(zip(env.hosts, ('',) * len(env.hosts)))
if pkg not in package_special:
package_special[pkg] = {}
package_special[pkg]['local'] = version
package_summary = {}
num_pkgs = len(package_servers.keys())
import curses
curses.setupterm()
sys.stdout.write("Checking versions %2d of %2d" % (0, num_pkgs))
sys.stdout.flush()
for i, pkg in enumerate(package_servers.keys()):
sys.stdout.write(curses.tigetstr('cr') + curses.tigetstr('el'))
sys.stdout.write("Checking versions %2d of %2d" % (i + 1, num_pkgs))
sys.stdout.flush()
index, version = _find_current_version(pkg, index_urls)
current_versions[pkg] = {'version': version, 'index': index}
if pkg not in package_special:
package_special[pkg] = {'local':''}
package_special[pkg]['current'] = version
current_record = {"Missing": False, "Out-of-Sync": False, "Updateable": False}
current_record['Out-of-Sync'] = len(package_versions[pkg].keys()) > 1
current_record['Missing'] = '' in package_servers[pkg].values()
for server, installed_version in package_servers[pkg].items():
if compare_versions(current_versions[pkg]['version'], installed_version) == 1:
current_record['Updateable'] = True
break
package_summary[pkg] = current_record
_format_output(package_servers, package_special, package_summary, only_problems)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment