Skip to content

Instantly share code, notes, and snippets.

@kived
Created March 3, 2015 22:44
Show Gist options
  • Save kived/68ad4f035a80fb2488d3 to your computer and use it in GitHub Desktop.
Save kived/68ad4f035a80fb2488d3 to your computer and use it in GitHub Desktop.
aptly: import from .changes files
#!/usr/bin/env python3
REPO_FORMAT = 'eepa-{dist}-build'
import sys
import os, os.path
import hashlib
import subprocess
def parse_changes(filename):
parser_state = 0
parse_name = None
changes = {}
filename = os.path.realpath(os.path.expanduser(filename))
with open(filename, 'r') as f:
for line in f:
if parser_state == 1:
if len(line) and line[0] == ' ':
changes[parse_name].append(line[1:].rstrip())
else:
parser_state = 0
if parser_state == 0:
name, value = [x.strip() for x in line.split(':', 1)]
if value:
changes[name] = value
else:
changes[name] = []
parse_name = name
parser_state = 1
def split_value(name):
if name in changes:
changes[name] = changes[name].split()
def split_group_values(name):
if name in changes:
group = {}
for i, ln in enumerate(changes[name][:]):
vals = ln.split()
group[vals[-1]] = vals[:-1]
changes[name] = group
split_value('Architecture')
split_value('Binary')
split_group_values('Files')
split_group_values('Checksums-Sha1')
split_group_values('Checksums-Sha256')
changes['_'] = {
'filename': filename,
'basepath': os.path.dirname(filename)
}
return changes
def hashfile(fp, method, blocksize=65536):
hash = getattr(hashlib, method)()
buf = fp.read(blocksize)
while len(buf) > 0:
hash.update(buf)
buf = fp.read(blocksize)
return hash.hexdigest()
def check_file(basepath, filename, info, method):
checksum = info[0]
size = int(info[1])
filepath = os.path.join(basepath, filename)
if not os.path.exists(filepath):
return None
stats = os.stat(filepath)
if stats.st_size != size:
return 'File size mismatch'
with open(filepath, 'rb') as f:
hash = hashfile(f, method)
if hash != checksum:
return '%s checksum failed' % (method, )
return 'OK'
def check_files(changes, method):
if method == 'sha256':
key = 'Checksums-Sha256'
elif method == 'sha1':
key = 'Checksums-Sha1'
elif method == 'md5':
key = 'Files'
files = changes[key]
basepath = changes['_']['basepath']
result = {filename: check_file(basepath, filename, info, method) for filename, info in files.items()}
changes['Checksum-Result'] = result
#######
input_file = sys.argv[1]
change_file = input_file + '.processing'
os.rename(input_file, change_file)
print('Importing packages from {}'.format(input_file))
changes = parse_changes(change_file)
for alg in ('sha256', 'sha1', 'md5'):
if alg in hashlib.algorithms_available:
check_files(changes, alg)
changes['_']['algorithm'] = alg
break
check_result = changes['Checksum-Result']
checksum_ok = all(v in ('OK', None) for v in check_result.values())
if not checksum_ok:
print('One or more files could not be processed:')
for filename, result in check_result.items():
print(' {}: {}'.format(filename, result))
sys.exit(1)
dist = changes['Distribution']
files = check_result.keys()
print('Found files to import to {}:'.format(dist))
for file in files:
print(' ', file)
basepath = changes['_']['basepath']
filepaths = [os.path.join(basepath, fn) for fn in files]
repo = REPO_FORMAT.format(dist=dist)
print('Preparing to import to {}...'.format(repo))
cmd = ['aptly', 'repo', 'add', repo]
for fn in filepaths[:]:
if not os.path.exists(fn):
# probably already processed
filepaths.remove(fn)
continue
try:
subprocess.check_call(cmd + [fn])
except OSError:
print('Import failed!')
sys.exit(2)
for fn in (change_file, ) + tuple(filepaths):
os.unlink(fn)
print('Import complete.')
@bersace
Copy link

bersace commented Jun 28, 2018

FTR, aptly now offer aptly repo include to manage .changes.

https://www.aptly.info/doc/aptly/repo/include/

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment