Skip to content

Instantly share code, notes, and snippets.

@foucault
Created June 12, 2021 23:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save foucault/2acd2e2616d4f07ebe35474f83ed3587 to your computer and use it in GitHub Desktop.
Save foucault/2acd2e2616d4f07ebe35474f83ed3587 to your computer and use it in GitHub Desktop.
#!/usr/bin/python
import ctypes
import difflib
import glob
import json
import os
import os.path
import re
import requests
import shutil
import subprocess
import sys
import tempfile
SCRIPT_NAME = "checkupdpy"
try:
from xdg import BaseDirectory
DBPATH = os.path.join(BaseDirectory.xdg_cache_home, "%s" % SCRIPT_NAME)
except:
DBPATH = os.path.join('/tmp', "%s-db" % SCRIPT_NAME)
LOCALDB = os.path.join(DBPATH, 'local')
SYNCDB = os.path.join(DBPATH, 'sync')
# pick vercmp from libalpm
ALPM = ctypes.CDLL('libalpm.so')
vercmp = ALPM.alpm_pkg_vercmp
vercmp.argtypes = [ctypes.c_char_p, ctypes.c_char_p]
BASE_ESC = "\033[%dm"
ESC_TERM = {
"reset": BASE_ESC % 0,
"bold": BASE_ESC % 1,
"black": BASE_ESC % 30,
"red": BASE_ESC % 31,
"green": BASE_ESC % 32,
"yellow": BASE_ESC % 33,
"blue": BASE_ESC % 34,
"magenta": BASE_ESC % 35,
"cyan": BASE_ESC % 36,
"white": BASE_ESC % 37
}
ESC_NOTATERM = {
"reset": '',
"bold": '',
"black": '',
"red": '',
"green": '',
"yellow": '',
"blue": '',
"magenta": '',
"cyan": '',
"white": ''
}
def printerr(*args, **kwargs):
if 'file' in kwargs:
kwargs = kwargs.pop('file')
print(*args, file=sys.stderr, **kwargs)
def parse_repo_updates(raw):
packages = []
for line in raw.splitlines():
(pkg, local, _, remote) = re.split('\s+', line)
packages.append((pkg, local, remote))
return packages
def format_line(clr, tag, pkg, local, remote):
idx = None
for (i, val) in enumerate(difflib.ndiff(local, remote)):
if val.startswith(('-', '+', '?')):
idx = i
break
if idx is not None:
localstr = "%s%s%s%s%s" % (clr['red'], local[:idx], clr['bold'],
local[idx:], clr['reset'])
remotestr = "%s%s%s%s%s" % (clr['green'], remote[:idx], clr['bold'],
remote[idx:], clr['reset'])
else:
localstr = "%s%s%s" % (clr['red'], local, clr['reset'])
remotestr = "%s%s%s" % (clr['red'], remote, clr['reset'])
return "[%s%s%s] %s%s%s %s → %s" % \
(
clr['blue'], tag, clr['reset'],
clr['bold'], pkg, clr['reset'],
localstr, remotestr
)
def get_aur_updates(raw):
local_pkgs = {}
for line in raw.splitlines():
(pkg, local) = re.split('\s+', line)
local_pkgs[pkg] = local
aur_data = requests.get('https://aur.archlinux.org/rpc/?v=5&type=info',
params={'arg[]': local_pkgs.keys()})
if aur_data.status_code != 200:
raise ValueError("Unexpected status code from AUR")
response = aur_data.json()['results']
packages = []
for entry in response:
name = entry['Name']
local = local_pkgs[name]
remote = entry['Version']
cmpres = vercmp(local.encode(), remote.encode())
if cmpres < 0:
packages.append((name, local, remote))
return packages
def main():
if sys.stdout.isatty():
clr = ESC_TERM
else:
clr = ESC_NOTATERM
try:
PACMANDIR = subprocess.check_output(['/usr/bin/pacman-conf', 'DBPath'])
TRUELOCALDB = os.path.join(PACMANDIR.decode().strip(), 'local')
TRUESYNCDB = os.path.join(PACMANDIR.decode().strip(), 'sync')
except Exception as exc:
# could not extract true local db
printerr("Could not determine pacman directory:", exc)
return 1
if not os.path.isdir(TRUELOCALDB):
printerr("Pacman local database is not a directory: %s" % TRUELOCALDB)
return 1
if not os.path.isdir(DBPATH):
os.makedirs(DBPATH, exist_ok=True)
if os.path.exists(LOCALDB) and (os.path.realpath(LOCALDB) != TRUELOCALDB):
# dbpath/local exists but it doesn't point to the real pacmandb
printerr('%s exists but does not point to the real pacman db: %s' %
(LOCALDB, TRUELOCALDB))
return 1
if not os.path.exists(LOCALDB):
os.symlink(TRUELOCALDB, LOCALDB)
if not os.path.exists(SYNCDB):
os.mkdir(SYNCDB)
for db in glob.glob(os.path.join(TRUESYNCDB, '*.db')):
db_basename = os.path.basename(db)
orig_db_mtime = os.stat(db).st_mtime
target_db = os.path.join(SYNCDB, db_basename)
try:
db_mtime = os.stat(target_db).st_mtime
except FileNotFoundError:
db_mtime = None
if db_mtime is None or db_mtime < orig_db_mtime:
# reuse pacman syncdbs if newer before sync
shutil.copy(db, target_db)
subprocess.run(['/usr/bin/fakeroot', '--',
'/usr/bin/pacman', '-Sqy',
'--dbpath', DBPATH, '--logfile', '/dev/null' ],
stdout=subprocess.DEVNULL)
repo_packages = []
aur_packages = []
# First, repo packages
try:
raw_repo_updates = subprocess.check_output(['/usr/bin/pacman', '-Qu',
'--dbpath', DBPATH, '--logfile', '/dev/null' ],
stderr=subprocess.DEVNULL)
repo_updates = parse_repo_updates(raw_repo_updates.decode().strip())
repo_packages.extend(repo_updates)
except subprocess.CalledProcessError as cpe:
# Could not find updates for repo packages
pass
except Exception as exc:
printerr("Unexpected exception when retrieving repo updates:", exc)
for (pkg, local, remote) in repo_packages:
print(format_line(clr, 'REPO', pkg, local, remote))
# and then AUR packages
try:
local_packages = subprocess.check_output(['/usr/bin/pacman', '-Qm',
'--dbpath', DBPATH, '--logfile', '/dev/null'],
stderr=subprocess.DEVNULL)
aur_updates = get_aur_updates(local_packages.decode().strip())
aur_packages.extend(aur_updates)
except subprocess.CalledProcessError as cpe:
# no local packages
pass
except Exception as exc:
printerr("Unexpected exception when processing AUR packages:", exc)
for (pkg, local, remote) in aur_packages:
print(format_line(clr, 'AUR ', pkg, local, remote))
return 0
if __name__ == "__main__":
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment