Skip to content

Instantly share code, notes, and snippets.

@iquiw
Created November 21, 2017 03:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save iquiw/d32a204f305c21521d44d67579ce667d to your computer and use it in GitHub Desktop.
Save iquiw/d32a204f305c21521d44d67579ce667d to your computer and use it in GitHub Desktop.
#! /usr/bin/python3
import apt
import apt_pkg
import json
import requests
import subprocess
webhook_url = '<WEBHOOK_URL>'
channel = '<CHANNEL>'
# copied from /usr/lib/update-notifier/apt-check
DISTRO = subprocess.check_output(
["lsb_release", "-c", "-s"],
universal_newlines=True).strip()
# copied from /usr/lib/update-notifier/apt-check
def isSecurityUpgrade(ver):
" check if the given version is a security update (or masks one) "
security_pockets = [("Ubuntu", "%s-security" % DISTRO),
("gNewSense", "%s-security" % DISTRO),
("Debian", "%s-updates" % DISTRO)]
for (file, index) in ver.file_list:
for origin, archive in security_pockets:
if (file.archive == archive and file.origin == origin):
return True
return False
class PrintAcquireProgress(apt.progress.base.AcquireProgress):
def fetch(self, x):
print('Fetched from ' + x.description)
def acu_update():
cache = apt_pkg.Cache(None)
source_list = apt_pkg.SourceList()
source_list.read_main_list()
cache.update(PrintAcquireProgress(), source_list)
return apt_pkg.Cache()
def acu_check(cache):
dep_cache = apt_pkg.DepCache(cache)
update_count = 0
security_count = 0
fields = []
for pack in cache.packages:
if pack.current_state == apt_pkg.CURSTATE_INSTALLED and dep_cache.is_upgradable(pack):
cand_ver = dep_cache.get_candidate_ver(pack)
update_count += 1
tmpl = '{} => {}'
if isSecurityUpgrade(cand_ver):
tmpl = '{} => *{}*'
security_count += 1
fields.append({
'title': pack.name,
'value': tmpl.format(pack.current_ver.ver_str, cand_ver.ver_str),
'short': True
})
fields.sort(key=lambda x: x['title'])
return (update_count, security_count, fields)
def acu_notify(update_count, security_count, fields, dryrun = False):
message = ''
if update_count == 0:
title = '今日のアップデートはありません。'
message = json.dumps({
'channel': channel,
'attachments': [{
'fallback': title,
'color': 'good',
'title': title
}]
}, ensure_ascii=False)
else:
title = '{} 個のパッケージがアップデート可能です。'.format(update_count)
color = 'warning'
if security_count > 0:
title += '\n{} 個のアップデートはセキュリティアップデートです。'.format(security_count)
color = 'danger'
message = json.dumps({
'channel': channel,
'attachments': [{
'fallback': title,
'color': color,
'title': title,
'fields': fields,
'mrkdwn_in': [ 'fields' ]
}]
}, ensure_ascii=False)
if dryrun:
print(message)
else:
r = requests.post(webhook_url, data={ 'payload': message })
print(r.status_code)
print(r.text)
def main():
apt_pkg.init()
cache = acu_update()
(update_count, security_count, fields) = acu_check(cache)
acu_notify(update_count, security_count, fields)
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment