Skip to content

Instantly share code, notes, and snippets.

@freyes
Forked from wolsen/cmadison.py
Created May 12, 2016 19:18
Show Gist options
  • Save freyes/37ed2f0c5bb730594169226f2fb6d645 to your computer and use it in GitHub Desktop.
Save freyes/37ed2f0c5bb730594169226f2fb6d645 to your computer and use it in GitHub Desktop.
rmadison + cloud-archive madison
#!/usr/bin/env python
#
# Provides a rather basic version of rmadison (or dak ls if you prefer)
# for the Ubuntu cloud-archive.
#
# This script works in the following manner:
# 1) It will show the rmadison output for the selected package to show
# the values of packages within the main ubuntu archives
# 2) It will show similar output for the selected package in the ubuntu
# cloud archives.
#
from lxml import etree
import collections
import gzip
import logging as log
import os.path
import shutil
import subprocess
import sys
import tempfile
import urllib2
# Defines teh default ubuntu cloud-archive repository URL.
UCA_DEB_REPO_URL = "http://ubuntu-cloud.archive.canonical.com/ubuntu/dists"
# This is where the Sources.gz files will be downloaded to.
# In the future, it'd be better to have these cached and know - but
# I'll /assume/ bandwidth is decent enough its not a super big issue.
working_dir = tempfile.mkdtemp()
def exclude_values(seq, values):
"""
Excludes a set of values from a sequence.
"""
if not isinstance(values, collections.Iterable):
values = [values]
for s in seq:
if not s in values:
yield s
def get_files_in_remote_url(relative_path=""):
"""
Returns a list of files found in the remote URL specified.
This is heavily dependent upon being able to browse the folders over
http as the ubuntu cloud archives are. If that changes, then this
script needs to be revisited.
:relative_path: a path relative to the UCA_DEB_REPO_URL
:return: list of files or folders found in the remote url.
"""
url = "%s/%s" % (UCA_DEB_REPO_URL, relative_path)
content = urllib2.urlopen(url)
root = etree.parse(content, etree.HTMLParser())
# Content available here should be directory listing, which is presented
# as a table, with each file in its own row. Use xpath expression to find
# the values of the text within the table columns.
files = []
for f in root.xpath('//*/td/*/text()'):
# Skip the canonical parent directory nav link
if f == 'Parent Directory':
continue
if f.endswith('/'):
f = f[:-1]
files.append(f)
log.debug("Found files at %s: %s", url, files)
return files
def get_available_dists():
"""
Returns the list of distributions which are available.
"""
# Each folder maps to a dist
dists = []
for folder in get_files_in_remote_url():
# Skip -proposed packages for now as well. This should probably be
# a command line script.
if folder.endswith('-proposed'):
log.debug('Skipping folder %s' % folder)
continue
if folder.startswith('precise'):
log.debug('Skipping precise folder')
continue
dists.append(folder)
return dists
def get_openstack_releases(dist):
"""
Returns a list of available OpenStack releases for the specified
distribution.
:param dist: the distribution to retrieve openstack releases for.
"""
os_releases = get_files_in_remote_url(dist)
log.debug("Found OpenStack releases for dist %s: %s", dist, os_releases)
return os_releases
class Sources(object):
def __init__(self, dist, os_release):
"""
Creates a new Sources which represents the Sources.gz file
for the source folder in the specified distro and OpenStack
release.
:param dist: the Ubuntu distribution
:param os_release: the OepnStack release
"""
fname = '%s_%s_Sources.gz' % (dist, os_release)
self.dist = dist
self.os_release = os_release
self.fname = os.path.join(working_dir, fname)
self.download()
def download(self):
"""
Downloads the file to parse Source information from.
"""
url = ("%(base_url)s/%(dist)s/%(os_release)s/main/source/Sources.gz" %
{'base_url': UCA_DEB_REPO_URL,
'dist': self.dist,
'os_release': self.os_release})
content = urllib2.urlopen(url)
with open(self.fname, 'wb+') as f:
f.write(content.read())
def get_sources(self):
"""
A generator returning the Source package descriptors
found in the Sources.gz file supplied.
:param filename: the file to read the source packages from.
"""
lines = []
for line in gzip.open(self.fname):
# Empty line is the end of the source package stanza
if line.strip() == '':
src = Source.parse(''.join(lines))
lines = []
yield src
else:
lines.append(line)
class Source(dict):
@property
def package(self):
return self['Package']
@property
def binaries(self):
binary_as_str = self['Binary']
return binary_as_str.split(', ')
@property
def version(self):
return self['Version']
@property
def architecture(self):
return self['Architecture']
@classmethod
def parse(cls, text):
"""
Parses basic content from the Sources.gz file in a debian archive for
retrieving basic information.
:param text: the text to parse
"""
src = Source()
lines = text.split('\n')
key = None
for line in lines:
if line.startswith(' '):
# Continuation from the previous line
src[key] = src[key] + line
else:
parts = line.split(': ')
key = parts[0]
value = ':'.join(parts[1:])
src[key] = value
return src
def print_table(table):
"""
Prints the table in a nice formatted output.
:param table: a table in a traditional representation
(a list of lists)
"""
widths = [max(len(x) for x in col) for col in zip(*table)]
for row in table:
out = " | ".join("{:{}}".format(x, widths[i])
for i, x in enumerate(row))
print " " + out
def do_rmadison_search(search_for):
"""
Runs the earch for the packages using rmadison.
"""
matches = []
try:
cmd = ['rmadison'] + search_for
output = subprocess.check_output(cmd)
lines = output.split('\n')
for line in lines:
match = [x.strip() for x in line.split(' |')]
matches.append(match)
except Exception as e:
log.error("Error querying rmadison: %s", str(e))
if len(matches) > 0:
return matches[:-1]
else:
return matches
def do_search():
"""
Runs the search for packages in the cloud archive.
"""
dists = get_available_dists()
mapping = {}
for d in dists:
os_releases = get_openstack_releases(d)
mapping[d] = os_releases
search_for = sys.argv[1:]
matches = []
for dist, os_releases in mapping.items():
for os_release in os_releases:
try:
for src in Sources(dist, os_release).get_sources():
for pkg in search_for:
mtype = ''
if src.package == pkg:
mtype = 'source'
elif pkg in src.binaries:
mtype = src.architecture
else:
# Not a match, continue
continue
match = [pkg,
src.version,
'cloud-archive:%s' % os_release,
mtype]
matches.append(match)
except urllib2.HTTPError:
pass
rmadison_results = do_rmadison_search(search_for)
matches = matches + rmadison_results
print_table(sorted(matches, key=lambda row: row[0]))
if __name__ == '__main__':
try:
if len(sys.argv) < 2:
print "E: need at least one package name as an argument."
exit(1)
else:
do_search()
finally:
shutil.rmtree(working_dir)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment