Skip to content

Instantly share code, notes, and snippets.

@tschwaerzl
Forked from yumminhuang/package_updates_check.py
Last active February 8, 2024 19:40
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tschwaerzl/2f7e92891e720643d9aa2b955d3c9e6e to your computer and use it in GitHub Desktop.
Save tschwaerzl/2f7e92891e720643d9aa2b955d3c9e6e to your computer and use it in GitHub Desktop.
Python script to check apt-get updates and send result to Slack channel
#!/usr/bin/env python
#coding=utf-8
import apt
import apt_pkg
from time import strftime
import json
import os
import requests
import subprocess
import sys
import socket
"""
Following functions are used to return package info of available updates.
See: /usr/lib/update-notifier/apt_check.py
"""
SYNAPTIC_PINFILE = "/var/lib/synaptic/preferences"
DISTRO = subprocess.check_output(["lsb_release", "-c", "-s"],
universal_newlines=True).strip()
def clean(cache,depcache):
""" unmark (clean) all changes from the given depcache """
# mvo: looping is too inefficient with the new auto-mark code
# for pkg in cache.Packages:
# depcache.MarkKeep(pkg)
depcache.init()
def saveDistUpgrade(cache,depcache):
""" this functions mimics a upgrade but will never remove anything """
depcache.upgrade(True)
if depcache.del_count > 0:
clean(cache,depcache)
depcache.upgrade()
def get_update_packages():
"""
Return a list of dict about package updates
"""
pkgs = []
apt_pkg.init()
# force apt to build its caches in memory for now to make sure
# that there is no race when the pkgcache file gets re-generated
apt_pkg.config.set("Dir::Cache::pkgcache","")
try:
cache = apt_pkg.Cache(apt.progress.base.OpProgress())
except SystemError as e:
sys.stderr.write("Error: Opening the cache (%s)" % e)
sys.exit(-1)
depcache = apt_pkg.DepCache(cache)
# read the pin files
depcache.read_pinfile()
# read the synaptic pins too
if os.path.exists(SYNAPTIC_PINFILE):
depcache.read_pinfile(SYNAPTIC_PINFILE)
# init the depcache
depcache.init()
try:
saveDistUpgrade(cache,depcache)
except SystemError as e:
sys.stderr.write("Error: Marking the upgrade (%s)" % e)
sys.exit(-1)
# use assignment here since apt.Cache() doesn't provide a __exit__ method
# on Ubuntu 12.04 it looks like
# aptcache = apt.Cache()
for pkg in cache.packages:
if not (depcache.marked_install(pkg) or depcache.marked_upgrade(pkg)):
continue
inst_ver = pkg.current_ver
cand_ver = depcache.get_candidate_ver(pkg)
if cand_ver == inst_ver:
continue
record = {"name": pkg.name,
"security": isSecurityUpgrade(pkg, depcache),
"section": pkg.section,
"current_version": inst_ver.ver_str if inst_ver else '-',
"candidate_version": cand_ver.ver_str if cand_ver else '-',
"priority": cand_ver.priority_str}
pkgs.append(record)
return pkgs
def isSecurityUpgrade(pkg, depcache):
def isSecurityUpgrade_helper(ver):
""" check if the given version is a security update (or masks one) """
security_pockets = [("Ubuntu", "%s-security" % DISTRO),
("gNewSense", "%s-security" % DISTRO),
("Debian", "%s-updates" % DISTRO)]
for (file, index) in ver.file_list:
for origin, archive in security_pockets:
if (file.archive == archive and file.origin == origin):
return True
return False
inst_ver = pkg.current_ver
cand_ver = depcache.get_candidate_ver(pkg)
if isSecurityUpgrade_helper(cand_ver):
return True
# now check for security updates that are masked by a
# canidate version from another repo (-proposed or -updates)
for ver in pkg.version_list:
if (inst_ver and
apt_pkg.version_compare(ver.ver_str, inst_ver.ver_str) <= 0):
#print "skipping '%s' " % ver.VerStr
continue
if isSecurityUpgrade_helper(ver):
return True
return False
def print_result(pkgs):
"""
Print package updates in a table
"""
security_updates = filter(lambda x: x.get('security'), pkgs)
text = list()
hostname = socket.gethostname()
text.append('Check Time: %s' % strftime('%m/%d/%Y %H:%M:%S'))
if not pkgs:
text.append('No available updates on %s.' % hostname)
else:
# Updates are available, build a table
text.append('Server: %s' % hostname)
text.append('%d packages can be updated.' % len(pkgs))
text.append('%d updates are security updates.' % len(security_updates))
text.append('-' * 65)
# List available security updates
text.append('Package Name'.ljust(20) +
'Current Version'.ljust(20) +
'Latest Version'.ljust(20) +
'Sec.'.ljust(5))
text.append('-' * 65)
for pkg in pkgs:
text.append('{:<20}{:<20}{:<20}{:<5}'.format(pkg.get('name')[:16] + '..',
pkg.get('current_version')[:16] + '..',
pkg.get('candidate_version')[:16] + '..',
'*' if pkg.get('security') else ''))
text.append('=' * 65)
return '\n'.join(text)
if __name__ == '__main__':
pkgs = get_update_packages()
available_updates = print_result(pkgs)
h = socket.gethostname()
c = len(pkgs)
s = len(filter(lambda x: x.get('security'), pkgs))
a = available_updates
slack_data = {
'attachments': [
{
'fallback':'Server: %s - %d Updates (%d Security) available' % (h,c,s),
'pretext':'Server: %s - %d Updates (%d Security) available' % (h,c,s),
'color':'#00CCF0',
'title': '%s' % h,
'text': '```%s```' % a,
'mrkdwn_in': ['text'],
}
]
}
webhook_url = 'https://hooks.slack.com/services/XXXXXXX/XXXXXXXXX/XXXXXXXX' #devops
response = requests.post(
webhook_url, data=json.dumps(slack_data),
headers={'Content-Type': 'application/json'}
)
print('%s - %s' % (response.status_code, response.text))
@tschwaerzl
Copy link
Author

Updated to send notification to SLACK.

Demo:

image

@petarkozic
Copy link

petarkozic commented Apr 12, 2018

This is awesome. Thanks for this !
I was modified fork for Centos, with only package list in same form.

https://gist.github.com/petarkozic/19aa8f53dc4e00c2cfa85b84d54e9abd

screen shot 2018-04-17 at 2 02 26 pm

@amitchettri
Copy link

Hello @tschwaerzl,

Do you have similar script for yum?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment