Skip to content

Instantly share code, notes, and snippets.

@KevinHonka
Created May 7, 2018 12:15
Show Gist options
  • Save KevinHonka/4233906a1003b7672eff954cc40486e6 to your computer and use it in GitHub Desktop.
Save KevinHonka/4233906a1003b7672eff954cc40486e6 to your computer and use it in GitHub Desktop.
Script to collect apt packages
#!/usr/bin/python
# -*- coding: utf-8 -*-
from __future__ import print_function
import logging
import os
import subprocess
import sys
import apt
import apt_pkg
from enum import Enum
class APT_Checker():
def __init__(self):
self.log = logging.getLogger('APT_Checker')
streamhandler = logging.StreamHandler(sys.stdout)
self.log.addHandler(streamhandler)
self.log.setLevel(logging.INFO)
class ExitCode(Enum):
"""
Enum Class to better select ExitCodes
"""
OK = 0
WARNING = 1
CRITICAL = 2
UNKNOWN = 3
SYNAPTIC_PINFILE = "/var/lib/synaptic/preferences"
DISTRO = subprocess.check_output(["lsb_release", "-c", "-s"],
universal_newlines=True).strip()
def clean(cache, depcache):
"""
unmark (clean) all changes from the given depcache
"""
depcache.init()
def saveDistUpgrade(self, cache, depcache):
"""
this functions mimics a upgrade but will never remove anything
"""
depcache.upgrade(True)
if depcache.del_count > 0:
self.clean(cache, depcache)
depcache.upgrade()
def get_update_packages(self):
"""
Return a list of dict about package updates
"""
pkgs = []
apt_pkg.init()
# force apt to build its caches in memory for now to make sure
# that there is no race when the pkgcache file gets re-generated
apt_pkg.config.set("Dir::Cache::pkgcache", "")
try:
cache = apt_pkg.Cache(apt.progress.base.OpProgress())
except SystemError as e:
self.logging.error("Error: Opening the cache (%s)" % e)
sys.exit(self.ExitCode['CRITICAL'].value)
depcache = apt_pkg.DepCache(cache)
# read the pin files
depcache.read_pinfile()
# read the synaptic pins too
if os.path.exists(self.SYNAPTIC_PINFILE):
depcache.read_pinfile(self.SYNAPTIC_PINFILE)
# init the depcache
depcache.init()
try:
self.saveDistUpgrade(cache, depcache)
except SystemError as e:
self.log.error("Error: Marking the upgrade (%s)" % e)
sys.exit(self.ExitCode['CRITICAL'].value)
# use assignment here since apt.Cache() doesn't provide a __exit__ method
# on Ubuntu 12.04 it looks like
# aptcache = apt.Cache()
for pkg in cache.packages:
if not (depcache.marked_install(pkg) or depcache.marked_upgrade(pkg)):
continue
inst_ver = pkg.current_ver
cand_ver = depcache.get_candidate_ver(pkg)
if cand_ver == inst_ver:
continue
record = {"name": pkg.name,
"security": self.isSecurityUpgrade(pkg, depcache),
"section": pkg.section,
"current_version": inst_ver.ver_str if inst_ver else '-',
"candidate_version": cand_ver.ver_str if cand_ver else '-',
"priority": cand_ver.priority_str}
pkgs.append(record)
self.packages = pkgs
def isSecurityUpgrade(self, pkg, depcache):
def isSecurityUpgrade_helper(ver):
"""
check if the given version is a security update (or masks one)
"""
security_pockets = [("Ubuntu", "%s-security" % self.DISTRO),
("gNewSense", "%s-security" % self.DISTRO),
("Debian", "%s-updates" % self.DISTRO)]
for (file, index) in ver.file_list:
for origin, archive in security_pockets:
if (file.archive == archive and file.origin == origin):
return True
return False
inst_ver = pkg.current_ver
cand_ver = depcache.get_candidate_ver(pkg)
if isSecurityUpgrade_helper(cand_ver):
return True
# now check for security updates that are masked by a
# canidate version from another repo (-proposed or -updates)
for ver in pkg.version_list:
if (inst_ver and
apt_pkg.version_compare(ver.ver_str, inst_ver.ver_str) <= 0):
continue
if isSecurityUpgrade_helper(ver):
return True
return False
def print_result(self):
"""
Print package updates in a table
"""
security_updates = filter(lambda x: x.get('security'), self.packages)
text = list()
if not self.packages:
self.log.info(' OK - No available updates on this machine')
sys.exit(self.ExitCode['OK'].value)
else:
# Updates are available, build a table
if len(security_updates) > 0:
text.append('CRITICAL')
exitcode = "CRITICAL"
elif len(self.packages) > 0:
text.append('WARNING')
exitcode = "WARNING"
text.append('%d packages can be updated.' % len(self.packages))
text.append('%d updates are security updates.' % len(security_updates))
text.append('-' * 100)
# List available security updates
text.append('Package Name'.ljust(30) +
'Current Version'.ljust(30) +
'Latest Version'.ljust(30) +
'Security'.ljust(10))
text.append('-' * 100)
for pkg in self.packages:
text.append('{:<30}{:<30}{:<30}{:<10}'.format(pkg.get('name'),
pkg.get(
'current_version'),
pkg.get(
'candidate_version'),
'*' if pkg.get('security') else ''))
text.append('=' * 100)
self.log.info('\n'.join(text))
sys.exit(self.ExitCode[exitcode].value)
if __name__ == '__main__':
euid = os.geteuid()
if euid != 0:
args = ['sudo', sys.executable] + sys.argv + [os.environ]
# the next line replaces the currently-running process with the sudo
os.execlpe('sudo', *args)
checker = APT_Checker()
checker.get_update_packages()
checker.print_result()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment