Created
June 12, 2021 23:22
-
-
Save foucault/2acd2e2616d4f07ebe35474f83ed3587 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import ctypes | |
import difflib | |
import glob | |
import json | |
import os | |
import os.path | |
import re | |
import requests | |
import shutil | |
import subprocess | |
import sys | |
import tempfile | |
SCRIPT_NAME = "checkupdpy" | |
try: | |
from xdg import BaseDirectory | |
DBPATH = os.path.join(BaseDirectory.xdg_cache_home, "%s" % SCRIPT_NAME) | |
except: | |
DBPATH = os.path.join('/tmp', "%s-db" % SCRIPT_NAME) | |
LOCALDB = os.path.join(DBPATH, 'local') | |
SYNCDB = os.path.join(DBPATH, 'sync') | |
# pick vercmp from libalpm | |
ALPM = ctypes.CDLL('libalpm.so') | |
vercmp = ALPM.alpm_pkg_vercmp | |
vercmp.argtypes = [ctypes.c_char_p, ctypes.c_char_p] | |
BASE_ESC = "\033[%dm" | |
ESC_TERM = { | |
"reset": BASE_ESC % 0, | |
"bold": BASE_ESC % 1, | |
"black": BASE_ESC % 30, | |
"red": BASE_ESC % 31, | |
"green": BASE_ESC % 32, | |
"yellow": BASE_ESC % 33, | |
"blue": BASE_ESC % 34, | |
"magenta": BASE_ESC % 35, | |
"cyan": BASE_ESC % 36, | |
"white": BASE_ESC % 37 | |
} | |
ESC_NOTATERM = { | |
"reset": '', | |
"bold": '', | |
"black": '', | |
"red": '', | |
"green": '', | |
"yellow": '', | |
"blue": '', | |
"magenta": '', | |
"cyan": '', | |
"white": '' | |
} | |
def printerr(*args, **kwargs): | |
if 'file' in kwargs: | |
kwargs = kwargs.pop('file') | |
print(*args, file=sys.stderr, **kwargs) | |
def parse_repo_updates(raw): | |
packages = [] | |
for line in raw.splitlines(): | |
(pkg, local, _, remote) = re.split('\s+', line) | |
packages.append((pkg, local, remote)) | |
return packages | |
def format_line(clr, tag, pkg, local, remote): | |
idx = None | |
for (i, val) in enumerate(difflib.ndiff(local, remote)): | |
if val.startswith(('-', '+', '?')): | |
idx = i | |
break | |
if idx is not None: | |
localstr = "%s%s%s%s%s" % (clr['red'], local[:idx], clr['bold'], | |
local[idx:], clr['reset']) | |
remotestr = "%s%s%s%s%s" % (clr['green'], remote[:idx], clr['bold'], | |
remote[idx:], clr['reset']) | |
else: | |
localstr = "%s%s%s" % (clr['red'], local, clr['reset']) | |
remotestr = "%s%s%s" % (clr['red'], remote, clr['reset']) | |
return "[%s%s%s] %s%s%s %s → %s" % \ | |
( | |
clr['blue'], tag, clr['reset'], | |
clr['bold'], pkg, clr['reset'], | |
localstr, remotestr | |
) | |
def get_aur_updates(raw): | |
local_pkgs = {} | |
for line in raw.splitlines(): | |
(pkg, local) = re.split('\s+', line) | |
local_pkgs[pkg] = local | |
aur_data = requests.get('https://aur.archlinux.org/rpc/?v=5&type=info', | |
params={'arg[]': local_pkgs.keys()}) | |
if aur_data.status_code != 200: | |
raise ValueError("Unexpected status code from AUR") | |
response = aur_data.json()['results'] | |
packages = [] | |
for entry in response: | |
name = entry['Name'] | |
local = local_pkgs[name] | |
remote = entry['Version'] | |
cmpres = vercmp(local.encode(), remote.encode()) | |
if cmpres < 0: | |
packages.append((name, local, remote)) | |
return packages | |
def main(): | |
if sys.stdout.isatty(): | |
clr = ESC_TERM | |
else: | |
clr = ESC_NOTATERM | |
try: | |
PACMANDIR = subprocess.check_output(['/usr/bin/pacman-conf', 'DBPath']) | |
TRUELOCALDB = os.path.join(PACMANDIR.decode().strip(), 'local') | |
TRUESYNCDB = os.path.join(PACMANDIR.decode().strip(), 'sync') | |
except Exception as exc: | |
# could not extract true local db | |
printerr("Could not determine pacman directory:", exc) | |
return 1 | |
if not os.path.isdir(TRUELOCALDB): | |
printerr("Pacman local database is not a directory: %s" % TRUELOCALDB) | |
return 1 | |
if not os.path.isdir(DBPATH): | |
os.makedirs(DBPATH, exist_ok=True) | |
if os.path.exists(LOCALDB) and (os.path.realpath(LOCALDB) != TRUELOCALDB): | |
# dbpath/local exists but it doesn't point to the real pacmandb | |
printerr('%s exists but does not point to the real pacman db: %s' % | |
(LOCALDB, TRUELOCALDB)) | |
return 1 | |
if not os.path.exists(LOCALDB): | |
os.symlink(TRUELOCALDB, LOCALDB) | |
if not os.path.exists(SYNCDB): | |
os.mkdir(SYNCDB) | |
for db in glob.glob(os.path.join(TRUESYNCDB, '*.db')): | |
db_basename = os.path.basename(db) | |
orig_db_mtime = os.stat(db).st_mtime | |
target_db = os.path.join(SYNCDB, db_basename) | |
try: | |
db_mtime = os.stat(target_db).st_mtime | |
except FileNotFoundError: | |
db_mtime = None | |
if db_mtime is None or db_mtime < orig_db_mtime: | |
# reuse pacman syncdbs if newer before sync | |
shutil.copy(db, target_db) | |
subprocess.run(['/usr/bin/fakeroot', '--', | |
'/usr/bin/pacman', '-Sqy', | |
'--dbpath', DBPATH, '--logfile', '/dev/null' ], | |
stdout=subprocess.DEVNULL) | |
repo_packages = [] | |
aur_packages = [] | |
# First, repo packages | |
try: | |
raw_repo_updates = subprocess.check_output(['/usr/bin/pacman', '-Qu', | |
'--dbpath', DBPATH, '--logfile', '/dev/null' ], | |
stderr=subprocess.DEVNULL) | |
repo_updates = parse_repo_updates(raw_repo_updates.decode().strip()) | |
repo_packages.extend(repo_updates) | |
except subprocess.CalledProcessError as cpe: | |
# Could not find updates for repo packages | |
pass | |
except Exception as exc: | |
printerr("Unexpected exception when retrieving repo updates:", exc) | |
for (pkg, local, remote) in repo_packages: | |
print(format_line(clr, 'REPO', pkg, local, remote)) | |
# and then AUR packages | |
try: | |
local_packages = subprocess.check_output(['/usr/bin/pacman', '-Qm', | |
'--dbpath', DBPATH, '--logfile', '/dev/null'], | |
stderr=subprocess.DEVNULL) | |
aur_updates = get_aur_updates(local_packages.decode().strip()) | |
aur_packages.extend(aur_updates) | |
except subprocess.CalledProcessError as cpe: | |
# no local packages | |
pass | |
except Exception as exc: | |
printerr("Unexpected exception when processing AUR packages:", exc) | |
for (pkg, local, remote) in aur_packages: | |
print(format_line(clr, 'AUR ', pkg, local, remote)) | |
return 0 | |
if __name__ == "__main__": | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment