Created
June 10, 2014 20:12
-
-
Save harlowja/6a185742de6d4887efe4 to your computer and use it in GitHub Desktop.
Rpm version mini-webserver
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
from BaseHTTPServer import (HTTPServer, BaseHTTPRequestHandler) | |
from optparse import OptionParser | |
from rpmUtils.miscutils import splitFilename | |
from yum import packages | |
import StringIO | |
import cgi | |
import httplib | |
import optparse | |
import os | |
import re | |
import sys | |
import urllib | |
import urlparse | |
import simplejson as json | |
class StringIOPlus(StringIO.StringIO): | |
def writeline(self, line): | |
self.write(line) | |
self.write(os.linesep) | |
def make_fake(name): | |
(name, ver, rel, epoch, arch) = splitFilename(name) | |
fake_pkg = packages.PackageObject() | |
fake_pkg.name = name | |
fake_pkg.arch = arch | |
fake_pkg.epoch = epoch | |
fake_pkg.version = ver | |
fake_pkg.release = rel | |
return fake_pkg | |
def dictify(pkg): | |
d = { | |
'name': pkg.name, | |
'epoch': pkg.epoch, | |
'version': pkg.version, | |
'release': pkg.release, | |
'architecture': pkg.arch, | |
} | |
return d | |
class RpmHandler(BaseHTTPRequestHandler): | |
def form_form(self): | |
output = StringIOPlus() | |
output.write((''' | |
<html> | |
<body> | |
<H1>RPM Version Checker 0.1</H1> | |
<form action="%WHERE%" method="GET"> | |
Base package: <input type="text" name="base" size="80" /><br /> | |
Compare package: <input type="text" name="compare0" size="80" /><br /><br /> | |
<input type="submit" value="Submit" /> | |
</form> | |
</body> | |
</html> | |
''')) | |
html = output.getvalue() | |
server = self.server | |
_root, port = server.server_address | |
replacewith = "http://%s:%s" % (server.server_name, port) | |
html = html.replace("%WHERE%", replacewith) | |
return ("text/html", html) | |
def form_response(self): | |
url = urlparse.urlparse(self.path) | |
qparams = url[4] | |
if not qparams: | |
return self.form_form() | |
qs = cgi.parse_qs(qparams) or {} | |
base = qs.get('base') | |
if not base: | |
return ('text/plain', "Missing required parameter 'base' (the base name to compare with)...") | |
if not isinstance(base, list): | |
base = [str(base)] | |
base = base[0] | |
base_p = make_fake(base) | |
items = [] | |
for (k, v) in qs.items(): | |
if re.match(r'compare[\w]*', k, re.I): | |
if not isinstance(v, list): | |
v = [str(v)] | |
items.extend(v) | |
if not items: | |
return ('text/plain', "Missing required parameters 'compare[\w]*' to compare against...") | |
nevars = [] | |
output = StringIOPlus() | |
output.writeline("Base package details:") | |
output.writeline(json.dumps(dictify(base_p), indent=4)) | |
for i in items: | |
compare_pkg = make_fake(i) | |
output.writeline("-" * 40) | |
output.writeline("Comparing base package to '%s'" % (i)) | |
output.writeline("Compare package details:") | |
output.writeline(json.dumps(dictify(compare_pkg), indent=4)) | |
if compare_pkg.name != base_p.name: | |
output.writeline("Results: Names are different!") | |
continue | |
if compare_pkg.arch != base_p.arch: | |
output.writeline("Results: Architectures are different!") | |
continue | |
res = packages.comparePoEVR(compare_pkg, base_p) | |
# return 1: a is newer than b | |
# 0: a and b are the same version | |
# -1: b is newer than a | |
if res >= 1: | |
output.writeline("Results: '%s' is newer than '%s'" % (i, base)) | |
elif res == 0: | |
output.writeline("Results: '%s' is the same as '%s'" % (i, base)) | |
else: | |
output.writeline("Results: '%s' is older than '%s'" % (i, base)) | |
return ('text/plain', output.getvalue()) | |
def do_GET(self): | |
who = self.client_address | |
print("Got a call from %s for path %s" % ( who, self.path)) | |
(ct, data) = self.form_response() | |
self.send_response(httplib.OK) | |
self.send_header("Content-Type", ct) | |
self.send_header("Content-Length", len(data)) | |
self.end_headers() | |
self.wfile.write(data) | |
def main(): | |
parser = OptionParser() | |
parser.add_option("-p", "--port", dest="port", | |
type="int", default=80, | |
help="port to run on", metavar="PORT") | |
(options, args) = parser.parse_args() | |
server = HTTPServer(('', options.port), RpmHandler) | |
sa = server.socket.getsockname() | |
print("Serving on %s using port %s ..." % ( sa[0], sa[1])) | |
server.serve_forever() | |
return 0 | |
if __name__ == '__main__': | |
rc = main() | |
print("Goodbye...[%s]" % (rc)) | |
sys.exit(rc) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment