Created
May 7, 2018 12:15
-
-
Save KevinHonka/4233906a1003b7672eff954cc40486e6 to your computer and use it in GitHub Desktop.
Script to collect apt packages
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# -*- coding: utf-8 -*- | |
from __future__ import print_function | |
import logging | |
import os | |
import subprocess | |
import sys | |
import apt | |
import apt_pkg | |
from enum import Enum | |
class APT_Checker(): | |
def __init__(self): | |
self.log = logging.getLogger('APT_Checker') | |
streamhandler = logging.StreamHandler(sys.stdout) | |
self.log.addHandler(streamhandler) | |
self.log.setLevel(logging.INFO) | |
class ExitCode(Enum): | |
""" | |
Enum Class to better select ExitCodes | |
""" | |
OK = 0 | |
WARNING = 1 | |
CRITICAL = 2 | |
UNKNOWN = 3 | |
SYNAPTIC_PINFILE = "/var/lib/synaptic/preferences" | |
DISTRO = subprocess.check_output(["lsb_release", "-c", "-s"], | |
universal_newlines=True).strip() | |
def clean(cache, depcache): | |
""" | |
unmark (clean) all changes from the given depcache | |
""" | |
depcache.init() | |
def saveDistUpgrade(self, cache, depcache): | |
""" | |
this functions mimics a upgrade but will never remove anything | |
""" | |
depcache.upgrade(True) | |
if depcache.del_count > 0: | |
self.clean(cache, depcache) | |
depcache.upgrade() | |
def get_update_packages(self): | |
""" | |
Return a list of dict about package updates | |
""" | |
pkgs = [] | |
apt_pkg.init() | |
# force apt to build its caches in memory for now to make sure | |
# that there is no race when the pkgcache file gets re-generated | |
apt_pkg.config.set("Dir::Cache::pkgcache", "") | |
try: | |
cache = apt_pkg.Cache(apt.progress.base.OpProgress()) | |
except SystemError as e: | |
self.logging.error("Error: Opening the cache (%s)" % e) | |
sys.exit(self.ExitCode['CRITICAL'].value) | |
depcache = apt_pkg.DepCache(cache) | |
# read the pin files | |
depcache.read_pinfile() | |
# read the synaptic pins too | |
if os.path.exists(self.SYNAPTIC_PINFILE): | |
depcache.read_pinfile(self.SYNAPTIC_PINFILE) | |
# init the depcache | |
depcache.init() | |
try: | |
self.saveDistUpgrade(cache, depcache) | |
except SystemError as e: | |
self.log.error("Error: Marking the upgrade (%s)" % e) | |
sys.exit(self.ExitCode['CRITICAL'].value) | |
# use assignment here since apt.Cache() doesn't provide a __exit__ method | |
# on Ubuntu 12.04 it looks like | |
# aptcache = apt.Cache() | |
for pkg in cache.packages: | |
if not (depcache.marked_install(pkg) or depcache.marked_upgrade(pkg)): | |
continue | |
inst_ver = pkg.current_ver | |
cand_ver = depcache.get_candidate_ver(pkg) | |
if cand_ver == inst_ver: | |
continue | |
record = {"name": pkg.name, | |
"security": self.isSecurityUpgrade(pkg, depcache), | |
"section": pkg.section, | |
"current_version": inst_ver.ver_str if inst_ver else '-', | |
"candidate_version": cand_ver.ver_str if cand_ver else '-', | |
"priority": cand_ver.priority_str} | |
pkgs.append(record) | |
self.packages = pkgs | |
def isSecurityUpgrade(self, pkg, depcache): | |
def isSecurityUpgrade_helper(ver): | |
""" | |
check if the given version is a security update (or masks one) | |
""" | |
security_pockets = [("Ubuntu", "%s-security" % self.DISTRO), | |
("gNewSense", "%s-security" % self.DISTRO), | |
("Debian", "%s-updates" % self.DISTRO)] | |
for (file, index) in ver.file_list: | |
for origin, archive in security_pockets: | |
if (file.archive == archive and file.origin == origin): | |
return True | |
return False | |
inst_ver = pkg.current_ver | |
cand_ver = depcache.get_candidate_ver(pkg) | |
if isSecurityUpgrade_helper(cand_ver): | |
return True | |
# now check for security updates that are masked by a | |
# canidate version from another repo (-proposed or -updates) | |
for ver in pkg.version_list: | |
if (inst_ver and | |
apt_pkg.version_compare(ver.ver_str, inst_ver.ver_str) <= 0): | |
continue | |
if isSecurityUpgrade_helper(ver): | |
return True | |
return False | |
def print_result(self): | |
""" | |
Print package updates in a table | |
""" | |
security_updates = filter(lambda x: x.get('security'), self.packages) | |
text = list() | |
if not self.packages: | |
self.log.info(' OK - No available updates on this machine') | |
sys.exit(self.ExitCode['OK'].value) | |
else: | |
# Updates are available, build a table | |
if len(security_updates) > 0: | |
text.append('CRITICAL') | |
exitcode = "CRITICAL" | |
elif len(self.packages) > 0: | |
text.append('WARNING') | |
exitcode = "WARNING" | |
text.append('%d packages can be updated.' % len(self.packages)) | |
text.append('%d updates are security updates.' % len(security_updates)) | |
text.append('-' * 100) | |
# List available security updates | |
text.append('Package Name'.ljust(30) + | |
'Current Version'.ljust(30) + | |
'Latest Version'.ljust(30) + | |
'Security'.ljust(10)) | |
text.append('-' * 100) | |
for pkg in self.packages: | |
text.append('{:<30}{:<30}{:<30}{:<10}'.format(pkg.get('name'), | |
pkg.get( | |
'current_version'), | |
pkg.get( | |
'candidate_version'), | |
'*' if pkg.get('security') else '')) | |
text.append('=' * 100) | |
self.log.info('\n'.join(text)) | |
sys.exit(self.ExitCode[exitcode].value) | |
if __name__ == '__main__': | |
euid = os.geteuid() | |
if euid != 0: | |
args = ['sudo', sys.executable] + sys.argv + [os.environ] | |
# the next line replaces the currently-running process with the sudo | |
os.execlpe('sudo', *args) | |
checker = APT_Checker() | |
checker.get_update_packages() | |
checker.print_result() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment