Created
November 9, 2015 20:24
-
-
Save harlowja/9b443d1af13a57a48f4f to your computer and use it in GitHub Desktop.
rpm-info.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import contextlib | |
import os | |
import shutil | |
import subprocess | |
import sys | |
import tempfile | |
from clint.textui import progress | |
import requests | |
from tabulate import tabulate | |
def is_url(rpm_filename): | |
return rpm_filename.startswith("http://") or \ | |
rpm_filename.startswith("https://") | |
@contextlib.contextmanager | |
def tempdir(**kwargs): | |
# This seems like it was only added in python 3.2 | |
# Make it since its useful... | |
# See: http://bugs.python.org/file12970/tempdir.patch | |
tdir = tempfile.mkdtemp(**kwargs) | |
try: | |
yield tdir | |
finally: | |
shutil.rmtree(tdir) | |
def dump_rpm_infos(rpm_infos): | |
headers = ['Filename', 'Patches'] | |
table = [] | |
for (rpm_filename, rpm_info) in rpm_infos: | |
rpm_filename = os.path.basename(rpm_filename) | |
table.append((rpm_filename, len(rpm_info['patches']))) | |
print(tabulate(table, headers=headers, tablefmt="grid")) | |
def tiny_p(cmd, capture=True, cwd=None): | |
stdout = subprocess.PIPE | |
stderr = subprocess.PIPE | |
if not capture: | |
stdout = None | |
stderr = None | |
sp = subprocess.Popen(cmd, stdout=stdout, | |
stderr=stderr, stdin=None, | |
universal_newlines=True, cwd=cwd) | |
(out, err) = sp.communicate() | |
ret = sp.returncode | |
if ret not in [0]: | |
raise RuntimeError("Failed running %s [rc=%s] (%s, %s)" | |
% (cmd, ret, out, err)) | |
return (out, err) | |
def get_rpm_info(rpm_filename): | |
with tempdir() as tdir: | |
if is_url(rpm_filename): | |
short_rpm_filename = os.path.basename(rpm_filename) | |
rpm_filename_url = rpm_filename | |
req = requests.get(rpm_filename_url, stream=True) | |
rpm_filename = os.path.join(tdir, short_rpm_filename) | |
print("Downloading: %s" % (rpm_filename_url)) | |
with open(rpm_filename, 'wb') as tfile: | |
# Thanks: http://stackoverflow.com/questions/15644964/python-progress-bar-and-downloads | |
content_length = int(req.headers.get('content-length')) | |
for chunk in progress.bar(req.iter_content(chunk_size=1024), | |
expected_size=(content_length/1024) + 1): | |
if chunk: | |
tfile.write(chunk) | |
tfile.flush() | |
else: | |
temp_rpm_filename = os.path.join(tdir, | |
os.path.basename(rpm_filename)) | |
shutil.copyfile(rpm_filename, temp_rpm_filename) | |
rpm_filename = temp_rpm_filename | |
out, err = tiny_p(['rpm2cpio', rpm_filename]) | |
rpm_cpio_filename = os.path.join(tdir, "%s.cpio" % rpm_filename) | |
with open(rpm_cpio_filename, 'wb') as tfile: | |
tfile.write(out) | |
rpm_cpio_extract_dir = os.path.join(tdir, "%s.cpio_extracted" % rpm_filename) | |
os.makedirs(rpm_cpio_extract_dir) | |
tiny_p(['cpio', '-iv', '--file', rpm_cpio_filename], | |
cwd=rpm_cpio_extract_dir) | |
rpm_info = { | |
'patches': [], | |
} | |
for filename in os.listdir(rpm_cpio_extract_dir): | |
if filename.endswith(".patch"): | |
rpm_info['patches'].append(filename) | |
return rpm_info | |
def main(): | |
if len(sys.argv) == 1: | |
print("%s rpm rpm*" % os.path.basename(sys.argv[0])) | |
return 1 | |
rpms = sys.argv[1:] | |
header = "Getting details for %s rpms" % len(rpms) | |
print("-" * len(header)) | |
print(header) | |
print("-" * len(header)) | |
rpm_infos = [] | |
for rpm_filename in rpms: | |
rpm_info = get_rpm_info(rpm_filename) | |
rpm_infos.append((rpm_filename, rpm_info)) | |
print("-------------") | |
print("Gathered info") | |
print("-------------") | |
dump_rpm_infos(rpm_infos) | |
return 0 | |
if __name__ == '__main__': | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment