Skip to content

Instantly share code, notes, and snippets.

@nroi
Last active September 25, 2022 10:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nroi/492ccf2d55400746cb8084984e04002f to your computer and use it in GitHub Desktop.
Save nroi/492ccf2d55400746cb8084984e04002f to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import argparse
import datetime
import requests
import tarfile
import tempfile
import gzip
import os
import hashlib
# Check checksums of locally stored ArchLinux package files:
# If a locally stored ArchLinux package has a checksum that differs from the checksum
# this file should have (according to the database file downloaded from an official mirror),
# a warning is emitted. Additionally, mismatching files can be deleted
# with the --delete flag.
DB_SOURCE_URI = 'https://mirror.osbeck.com/archlinux'
def fetch_package_metadata(url):
response = requests.get(url)
if response.status_code != 200:
print('Skip URL {} due to status code {}'.format(url, response.status_code))
return []
else:
tmpfile = tempfile.TemporaryFile()
tmpfile.write(response.content)
tmpfile.seek(0)
gzipfile = gzip.GzipFile(fileobj=tmpfile)
tar = tarfile.open(fileobj=gzipfile)
members = [member for member in tar.getmembers() if member.name.endswith('/desc')]
return [package_desc_from_member(tar, member) for member in members]
def package_desc_from_member(tar, member):
extracted_file = tar.extractfile(member)
content = extracted_file.read().decode('utf-8')
return parse_package_desc(content)
def parse_package_desc(package_desc):
sha256sum = None
filename = None
csize = None
lines = package_desc.split('\n')
for idx, line in enumerate(lines):
if line.startswith('%SHA256SUM%'):
sha256sum = lines[idx + 1]
elif line.startswith('%FILENAME%'):
filename = lines[idx + 1]
elif line.startswith('%CSIZE%'):
csize = lines[idx + 1]
return {
'sha256sum': sha256sum,
'filename': filename,
'csize': int(csize),
}
def verify_checksums(directory, packages):
result = empty_result()
for package in packages:
path = os.path.join(directory, package['filename'])
try:
with open(path, 'rb') as f:
local_sha256sum = get_sha256sum(f)
stat = os.stat(path)
local_filesize = stat.st_size
local_mtime = datetime.datetime.fromtimestamp(stat.st_mtime)
cfs_filesize = get_cfs_filesize(path)
if local_sha256sum != package['sha256sum']:
result['mismatches'].append({
'filename': package['filename'],
'path': path,
'local_sha256sum': local_sha256sum,
'remote_sha256sum': package['sha256sum'],
'local_filesize': local_filesize,
'remote_filesize': package['csize'],
'local_mtime': local_mtime,
'cfs_filesize': cfs_filesize,
})
else:
result['num_matches'] += 1
except FileNotFoundError:
# File does not exist, because it hasn't been downloaded.
# Safe to ignore this exception.
pass
return result
def get_cfs_filesize(path):
directory, basename = os.path.split(path)
cfs_basename = '.{}.cfs'.format(basename)
cfs_path = os.path.join(directory, cfs_basename)
try:
with open(cfs_path, 'r') as f:
content_length = int(f.read().rstrip())
except FileNotFoundError:
return -1
return content_length
def aggregate_results(results):
aggregated_result = empty_result()
for result in results:
aggregated_result['num_matches'] += result['num_matches']
aggregated_result['mismatches'] += result['mismatches']
return aggregated_result
def empty_result():
return {
'num_matches': 0,
'mismatches': [],
}
def get_sha256sum(fileobject):
sha256_hash = hashlib.sha256()
for chunk in iter(lambda: fileobject.read(4096), b''):
sha256_hash.update(chunk)
return sha256_hash.hexdigest()
def main():
parser = argparse.ArgumentParser(description='Check pacman files for sha256sum mismatches.')
parser.add_argument(
'--delete',
action='store_true',
help='Delete all files that do not match',
)
parser.add_argument(
'--flexo-pkg-dir',
action='store',
help='The directory where Flexo stores its packages',
default='/var/cache/flexo/pkg',
type=str,
)
args = parser.parse_args()
results = []
repos_to_check = os.listdir(args.flexo_pkg_dir)
for repo in repos_to_check:
url = '{}/{}/os/x86_64/{}.db'.format(DB_SOURCE_URI, repo, repo)
local_path = '{}/{}/os/x86_64/'.format(args.flexo_pkg_dir, repo)
package_metadata = fetch_package_metadata(url)
results.append(verify_checksums(local_path, package_metadata))
result = aggregate_results(results)
if len(result['mismatches']) == 0:
print('No mismatches were detected. Matching files: {}'.format(result['num_matches']))
else:
print('Mismatches were detected for the following files:', end='\n\n')
for mismatch in result['mismatches']:
print('Filename:\t\t{}'.format(mismatch['filename']))
print('File path:\t\t{}'.format(mismatch['path']))
print('Expected sha256sum:\t{}'.format(mismatch['remote_sha256sum']))
print('Actual sha256sum:\t{}'.format(mismatch['local_sha256sum']))
print('Expected filesize:\t{}'.format(mismatch['remote_filesize']))
print('Actual filesize:\t{}'.format(mismatch['local_filesize']))
print('CFS filesize:\t\t{}'.format(mismatch['cfs_filesize']))
print('Modification time:\t{}'.format(mismatch['local_mtime']))
if args.delete:
for mismatch in result['mismatches']:
print('Deleting file ' + mismatch['path'])
os.remove(mismatch['path'])
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment