Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
import re
import os
import io
import json
import lzma
import email
import pickle
import hashlib
import tarfile
import binascii
import arfile
import base64
from functools import update_wrapper
from urllib.request import urlopen
from urllib.parse import urljoin
from itertools import chain
debug_file_re = re.compile(r'/\.build-id/([a-f0-9]{2})/([a-f0-9]{10,})\.debug$')
embedded_version_re = re.compile(r'^(.*?)\s+\((.*?)\)$')
def parse_package_file(f):
buf = []
for line in chain(f, [b'']):
line = line.rstrip(b'\r\n')
if not line:
if buf:
yield dict(email.message_from_bytes(b'\n'.join(buf)).items())
buf = []
else:
buf.append(line)
class DebugFile(object):
def __init__(self, build_id, data):
self.build_id = build_id
self.data = data
def __repr__(self):
return '<DebugFile build_id=%r (%d bytes)>' % (
self.build_id,
len(self.data),
)
class PackageRef(object):
def __init__(self, dist, component, name, arch, version, source, debug_archive_url):
self.dist = dist
self.component = component
self.name = name
self.arch = arch
self.version = version
self.source = source
self.debug_archive_url = debug_archive_url
@property
def source_package_ptr(self):
match = embedded_version_re.match(self.source)
if match is not None:
return match.groups()
return self.arch, self.source, self.version
def iter_debug_files(self):
with urlopen(self.debug_archive_url) as f:
ar = arfile.ArFile(fileobj=io.BytesIO(f.read()))
zdata = ar.extractfile('data.tar.xz')
archive = tarfile.open(fileobj=lzma.LZMAFile(zdata))
for info in archive.getmembers():
match = debug_file_re.search(info.name)
if match is None:
continue
build_id = match.group(1) + match.group(2)
yield DebugFile(build_id, archive.extractfile(info).read())
def iter_binaries(self):
with urlopen(self.binary_archive_url) as f:
pass
def __repr__(self):
return '<PackageRef dist=%r component=%r name=%r arch=%r version=%r>' % (
self.dist,
self.component,
self.name,
self.arch,
self.version,
)
class Release(object):
def __init__(self, repo, dist):
self.repo = repo
self.dist = dist
with urlopen(self.base_url + '/Release') as f:
meta = email.message_from_binary_file(f)
self.release_info = dict(meta.items())
@property
def base_url(self):
return '%sdists/%s/' % (
self.repo.binary_index,
self.dist,
)
@property
def debug_base_url(self):
return '%sdists/%s-debug/' % (
self.repo.debug_index,
self.dist,
)
@property
def archs(self):
archs = self.release_info['Architectures'].split()
return frozenset(x for x in archs if x in self.repo.archs)
@property
def components(self):
return frozenset(self.release_info['Components'].split())
def iter_build_id_indexes(self):
for component in self.components:
for arch in self.archs:
yield self._get_build_id_index(component, arch)
def iter_non_debug_packages(self):
for component in self.components:
for arch in self.archs:
url = urljoin(self.base_url, '%s/binary-%s/Packages.xz' % (
component,
arch,
))
for package in self._iter_packages(url):
yield package
url = urljoin(self.debug_base_url, '%s/binary-%s/Packages.xz' % (
component,
arch,
))
for package in self._iter_packages(url):
if not package.get('Build-Ids'):
yield package
def _iter_packages(self, url):
with urlopen(url) as zf:
with lzma.LZMAFile(zf) as f:
for package in parse_package_file(f):
yield package
def _get_build_id_index(self, component, arch):
url = urljoin(self.debug_base_url, '%s/binary-%s/Packages.xz' % (
component,
arch,
))
index = {}
for package in self._iter_packages(url):
package_ref = PackageRef(
dist=self.dist,
component=component,
arch=arch,
name=package['Package'],
version=package['Version'],
source=package['Source'],
debug_archive_url=urljoin(self.repo.debug_index, package['Filename']),
)
for build_id in package.get('Build-Ids', '').split():
index[build_id] = package_ref
return index
def __repr__(self):
return '<Release dist=%r archs=%r components=%r>' % (
self.dist,
sorted(self.archs),
sorted(self.components),
)
class Repository(object):
def __init__(self, binary_index, debug_index, dists, archs, cache_dir):
self.binary_index = binary_index
self.debug_index = debug_index
self.dists = dists
self.archs = archs
self.cache_dir = cache_dir
def build_index(self, filename):
build_ids = {}
packages = {}
for release in self.iter_releases():
all_packages = {}
for package in release.iter_non_debug_packages():
all_packages[package['Architecture'], package['Package'], package['Version']] = urljoin(
self.binary_index, package['Filename'])
if 'Source' in package:
match = embedded_version_re.match(package['Source'])
if match is not None:
all_packages[(package['Architecture'],) + match.groups()] = urljoin(
self.binary_index, package['Filename'])
for index in release.iter_build_id_indexes():
for build_id, pkg_ref in index.items():
package_key = base64.urlsafe_b64encode(hashlib.md5(
pkg_ref.debug_archive_url.encode('utf-8')).digest()).decode('utf-8').rstrip('=')
build_ids[build_id] = package_key
if package_key not in packages:
packages[package_key] = {
'dbg': pkg_ref.debug_archive_url,
}
try:
packages[package_key]['bin'] = all_packages[
pkg_ref.source_package_ptr]
except LookupError:
print('missing', pkg_ref, pkg_ref.source_package_ptr)
try:
os.makedirs('index/build-id')
except OSError:
pass
for build_id, pkg in build_ids.items():
with open('index/build-id/%s' % build_id, 'w') as f:
f.write(pkg)
try:
os.makedirs('index/pkg')
except OSError:
pass
for pkg, meta in packages.items():
with open('index/pkg/%s' % pkg, 'w') as f:
json.dump(meta, f)
def iter_releases(self):
for dist in self.dists:
yield Release(self, dist)
debian_official = Repository(
binary_index='http://ftp.debian.org/debian/',
debug_index='http://debug.mirrors.debian.org/debian-debug/',
dists=['stable'],
archs=['amd64', 'i386'],
cache_dir='cache',
)
debian_official.build_index('index.json')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.