Last active
March 21, 2023 15:51
-
-
Save PtaxLaine/58e91164a090aacc222c0c34c65345e9 to your computer and use it in GitHub Desktop.
check pacman repository integrity
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/bash | |
set -e | |
find . -name "*.db" -print0 | xargs -0 -n1 sh -c "/usr/bin/pacman-check.py $1 \$0 || exit 255" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import tarfile | |
import sys | |
import io | |
import hashlib | |
import argparse | |
from pathlib import Path | |
READ_BLOCK_SIZE = 1024 * 1024 * 10 | |
def calc_fs_hash(fs): | |
hasher = hashlib.sha256() | |
while data := fs.read(READ_BLOCK_SIZE): | |
hasher.update(data) | |
return hasher.hexdigest() | |
def check(desc, directory, show_only_errors, check_hashes): | |
file_name = desc['FILENAME'][0] | |
target_file_size = int(desc['CSIZE'][0]) | |
target_sha256sum = desc['SHA256SUM'][0] | |
try: | |
fs = open(directory / file_name, 'rb') | |
actual_file_size = fs.seek(0, io.SEEK_END) | |
fs.seek(0, io.SEEK_SET) | |
fs_sig = open(directory / f'{file_name}.sig') | |
if check_hashes: | |
actual_sha256sum = calc_fs_hash(fs) | |
except Exception as err: | |
print('ERR : ', file_name, ' : ', err) | |
return False | |
if target_file_size != actual_file_size: | |
print(f'ERR : {file_name} : INVALID FILE SIZE : {target_file_size} vs {actual_file_size}') | |
return False | |
if check_hashes: | |
if target_sha256sum == actual_sha256sum: | |
if not show_only_errors: | |
print(f'OK : {file_name} : {target_sha256sum}') | |
return True | |
else: | |
print(f'ERR : {file_name} : INVALID CHECKSUM : {target_sha256sum} vs {actual_sha256sum}') | |
return False | |
else: | |
print(f'OK : {file_name}') | |
return True | |
def main(db_file, show_only_errors, stop_on_first_error, check_hashes): | |
directory = db_file.parent | |
db = tarfile.open(db_file, 'r') | |
db_iter = (x for x in db if x.name.endswith('/desc')) | |
db_iter = (x for x in db_iter if x.isfile) | |
error_count = 0 | |
for member in db_iter: | |
member_name = member.name | |
desc_file = db.extractfile(member).read().decode('UTF-8') | |
desc_items = desc_file.split('\n\n') | |
desc_items = (x.strip() for x in desc_items) | |
desc_items = (x.split('\n') for x in desc_items if x) | |
desc_items = ([x[0].replace('%', ''), *x[1:]] for x in desc_items) | |
desc_items = {x[0]: x[1:] for x in desc_items} | |
if not check(desc_items, directory, show_only_errors, check_hashes): | |
error_count += 1 | |
if stop_on_first_error: | |
break | |
if error_count: | |
exit(-1) | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser() | |
parser.add_argument('database', help='database file (*.db)') | |
parser.add_argument('--check-hashes', dest='check_hashes', default=False, help='check repository hashes', action='store_true') | |
parser.add_argument('--show-only-errors', dest='show_only_errors', default=False, help='don\'t report succesful checks', action='store_true') | |
parser.add_argument('--continue-after-error', dest='continue_after_error', default=False, help='don\'t stop process when error happened', action='store_true') | |
args = parser.parse_args() | |
db_file = Path(args.database) | |
show_only_errors = args.show_only_errors | |
stop_on_first_error = not args.continue_after_error | |
check_hashes = args.check_hashes | |
main(db_file, show_only_errors, stop_on_first_error, check_hashes) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment