Created
March 13, 2020 16:19
-
-
Save bentwire/60b92136ca8c1369c16d72ed1ea245c2 to your computer and use it in GitHub Desktop.
libapt-pkg in chroot...
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
# This is an attempt to build a python work-alike of multistrap. | |
# It currently lacks many features | |
# Dependency selection ... is crap. Needs proper version selection | |
import apt | |
import os | |
import sys | |
import subprocess | |
import argparse | |
import pathlib | |
import aptsources | |
from aptsources.sourceslist import SourceEntry | |
from pathlib import Path | |
from softwareproperties.SoftwareProperties import SoftwareProperties, shortcut_handler | |
from softwareproperties.shortcuts import ShortcutException | |
from apt import debfile | |
from apt_pkg import config, init_system, PackageManager, DepCache | |
from ruamel.yaml import YAML | |
from time import sleep | |
ANSI_GREEN="\u001b[32;1m" | |
ANSI_RED="\u001b[31;1m" | |
ANSI_RESET="\u001b[0m" | |
class InstallProgress(apt.progress.base.InstallProgress): | |
def __init__(self, root): | |
super(InstallProgress, self).__init__() | |
self.root = root | |
def conffile(self, current, new): | |
print("CONFFILE: " + current + " " + new) | |
def error(self, pkg, errmsg): | |
print("ERROR: " + pkg + " :" + errmsg) | |
def processing(self, pkg, stage): | |
print("PROCESSING: " + pkg + " :" + stage) | |
def dpkg_status_change(self, pkg, status): | |
print("DPKG STATUS CHANGE: " + pkg + " :" + status) | |
def status_change(self, pkg, pct, status): | |
print("STATUS CHANGE: " + pkg + " :" + pct + " :" +status) | |
def start_update(self): | |
pass #print("START UPDATE") | |
def finish_update(self): | |
pass #print("FINISH UPDATE") | |
def run(self, obj): # Override this to make sure we can pass the correct flags to dpkg | |
print("RUN: " + obj) | |
"""Install using the object 'obj'. | |
This functions runs install actions. The parameter 'obj' may either | |
be a PackageManager object in which case its do_install() method is | |
called or the path to a deb file. | |
If the object is a PackageManager, the functions returns the result | |
of calling its do_install() method. Otherwise, the function returns | |
the exit status of dpkg. In both cases, 0 means that there were no | |
problems. | |
""" | |
env = {"DEBIAN_FRONTEND": "noninteractive", "DEBCONF_NONINTERACTIVE_SEEN": "true", "LC_ALL": "C", "LANGUAGE": "C", "LANG": "C", "PATH": "/:/bin:/sbin:/usr/bin:/usr/sbin"} | |
args = ["/usr/bin/dpkg", "--debug=002", "--force-all", "--root="+self.root, "--admindir="+self.root+"/var/lib/dpkg", "--unpack", obj] | |
#args = ["/usr/bin/dpkg", "-X", obj, self.root] | |
#ret = subprocess.run(args, stdout=self.write_stream.fileno(), stderr=self.write_stream.fileno()) | |
ret = subprocess.run(args, env=env) | |
#print(ret.stdout) | |
if ret.returncode > 0: | |
print(ANSI_RED + "Failed to extract: " + obj) | |
print("\tRET: " + str(ret.returncode)+ANSI_RESET) | |
return ret.returncode | |
# After the extraction we need to manipulate some files so that apt and friends think the package is extracted and ready to be configured | |
# check starting at line 646 in multistrap for an example of how to do this. | |
# args = ["/usr/bin/dpkg", "--root="+self.root, "-A", obj] | |
# ret = subprocess.run(args, env=env) | |
# | |
# #print(ret.stdout) | |
# if ret.returncode > 0: | |
# print(ANSI_RED + "Failed to mark: " + obj) | |
# print("\tRET: " + str(ret.returncode)+ANSI_RESET) | |
# return ret.returncode | |
return ret.returncode | |
class DistBuilder(object): | |
# Various apt/dpkg related constants | |
CACHEDIR = "var/cache/apt/" # archives | |
LIBDIR = "var/lib/apt/" # lists | |
LOGDIR = "var/log/" # logs | |
ETCDIR = "etc/apt/" # sources | |
DPKGDIR = "var/lib/dpkg/" # state | |
def __init__(self, root): | |
self.root = pathlib.PurePath(root + "/") | |
self.cachedir = pathlib.PurePath(self.root, self.CACHEDIR) | |
self.libdir = pathlib.PurePath(self.root, self.LIBDIR) | |
self.logdir = pathlib.PurePath(self.root, self.LOGDIR) | |
self.etcdir = pathlib.PurePath(self.root, self.ETCDIR) | |
self.dpkgdir = pathlib.PurePath(self.root, self.DPKGDIR) | |
# Initialize cache | |
self.cache = apt.Cache(rootdir=str(self.root)) | |
self.yaml = YAML() | |
self.to_install = {} | |
self.conflicts = {} | |
def exec(self, args=[]): | |
ret = subprocess.run(args) | |
return ret.returncode | |
def exec_chroot(self, cargs=[], env=None): | |
args = ["chroot", str(self.root)] | |
for arg in cargs: | |
args.append(arg) | |
print(args) | |
ret = subprocess.run(args, env=env) | |
return ret.returncode | |
def exec_preinst(self): | |
env = {"DEBIAN_FRONTEND": "noninteractive", "DEBCONF_NONINTERACTIVE_SEEN": "true", "LC_ALL": "C", "LANGUAGE": "C", "LANG": "C", "PATH": "/bin:/sbin:/usr/bin:/usr/sbin"} | |
preinst_scripts = Path(self.dpkgdir, "info/").glob("*.preinst") | |
print("PRE:") | |
for script in preinst_scripts: | |
relscript = "/var/lib/dpkg/info/" + script.name | |
if "bin/sh" in script.read_text(encoding='iso-8859-1', errors='ignore'): | |
print("\tRUN: " + str(relscript)) | |
self.exec_chroot(["/bin/bash", str(relscript), "install"], env=env) | |
self.exec_chroot(["dpkg", "--configure", "-a"], env=env) | |
def init_chroot(self): | |
# Initialize and create dirs and files required in chroot. | |
Path(self.cachedir).mkdir(parents=True, exist_ok=True) | |
Path(self.libdir, "lists/").mkdir(parents=True, exist_ok=True) | |
Path(self.logdir).mkdir(parents=True, exist_ok=True) | |
Path(self.etcdir).mkdir(parents=True, exist_ok=True) | |
Path(self.dpkgdir).mkdir(parents=True, exist_ok=True) | |
self.sourcesparts = Path(self.root, "etc/apt/sources.list.d/").mkdir(parents=True, exist_ok=True) | |
self.trustedparts = Path(self.root, "etc/apt/trusted.gpg.d/").mkdir(parents=True, exist_ok=True) | |
self.confparts = Path(self.root, "etc/apt/apt.conf.d/").mkdir(parents=True, exist_ok=True) | |
self.preferencesparts = Path(self.root, "etc/apt/preferences.d/").mkdir(parents=True, exist_ok=True) | |
Path(self.root, "etc/network/").mkdir(parents=True, exist_ok=True) | |
Path(self.root, "usr/share/info/").mkdir(parents=True, exist_ok=True) | |
Path(self.root, "usr/bin/").mkdir(parents=True, exist_ok=True) | |
Path(self.root, "dev/").mkdir(parents=True, exist_ok=True) | |
Path(self.root, "proc/").mkdir(parents=True, exist_ok=True) | |
Path(self.root, "sys/").mkdir(parents=True, exist_ok=True) | |
Path(self.root, "tmp/").mkdir(parents=True, exist_ok=True) | |
Path(self.root, "var/log/").mkdir(parents=True, exist_ok=True) | |
Path(self.dpkgdir, "updates/").mkdir(parents=True, exist_ok=True) | |
Path(self.dpkgdir, "info/").mkdir(parents=True, exist_ok=True) | |
Path(self.dpkgdir, "tmp.ci/").mkdir(parents=True, exist_ok=True) | |
self.available_file = Path(self.dpkgdir, "available") | |
self.status_file = Path(self.dpkgdir, "status") | |
self.diversions_file = Path(self.dpkgdir, "diversions") | |
self.statoverride_file = Path(self.dpkgdir, "statoverride") | |
self.arch_file = Path(self.dpkgdir, "arch") | |
self.pkgcache = Path(self.root, "var/cache/apt/archives/") | |
self.sources = Path(self.etcdir, "sources.list") | |
self.logfile = Path(self.logdir, "dpkg.log") | |
self.logfile.touch() | |
self.available_file.touch() | |
self.arch_file.touch() | |
self.statoverride_file.touch() | |
self.diversions_file.touch() | |
self.status_file.touch() | |
def init_apt(self): | |
os.environ["DEBIAN_FRONTEND"] = "noninteractive" | |
os.environ["DEBCONF_NONINTERACTIVE_SEEN"] = "true" | |
# Create apt config, config is from the apt package. | |
config.set("Apt::Architecture", "amd64") # This package will ONLY install amd64 + i386 multi-arch. | |
config.set("Apt::Architectures", "amd64,i386") | |
config.set("Apt::Install-Recommends", "false") | |
config.set("Apt::Get::Download-Only", "true") | |
#config.set("Dir::Log", str(logdir)) | |
#config.set("Dir::Etc", str(etcdir)) | |
#config.set("Dir::Etc::TrustedParts", str(trustedparts)) | |
#config.set("Dir::Etc::SourceList", str(sources)) | |
config.set("Dir::Bin::dpkg", "/usr/bin/dpkg") | |
#config.set("Dpkg::Options::", "--dry-run") | |
config.set("Dpkg::Options::", "--root="+str(self.root)) | |
config.set("Dpkg::Options::", "--instdir="+str(self.root)) | |
config.set("Dpkg::Options::", "--admindir="+str(self.dpkgdir)) | |
config.set("Dpkg::Options::", "--log="+str(self.logfile)) | |
#config.clear("DPkg::Pre-Install-Pkgs") | |
#print("DUMP CONFIG START") | |
#print(config.dump()) | |
#print("DUMP CONFIG END") | |
init_system() # Force config reread | |
self.cache.open() | |
self.software_properties = SoftwareProperties(rootdir=str(self.root)) # Allow parsing and creation of sources.list files | |
#distro = aptsources.distro.get_distro() | |
#distro.get_sources(sp.sourceslist) | |
def set_apt_unauth(self): | |
config.set("Apt::Get::AllowUnauthenticated", "true") | |
config.set("Acquire::AllowInsecureRepositories", "1") | |
init_system() # Force config reread | |
self.cache.open() | |
def set_apt_auth(self): | |
config.set("Apt::Get::AllowUnauthenticated", "false") | |
config.set("Acquire::AllowInsecureRepositories", "0") | |
init_system() # Force config reread | |
self.cache.open() | |
def setup_sources(self): | |
# This part must be done unauthenticated | |
self.set_apt_unauth() | |
# Get sources and add | |
keydebs = [] | |
keystores = [] | |
sources_file = Path("sources.yaml") | |
sources = self.yaml.load(sources_file) | |
for source in sources['sources']: | |
print("Source: " + source['shortcut']) | |
shortcut = shortcut_handler(source['shortcut']) | |
shortcut.add_key() | |
self.software_properties.add_source_from_shortcut(shortcut, True) | |
try: | |
keyfile = source['key'] | |
kfp = Path(keyfile) | |
# print(kfp) | |
if kfp.suffix is '': # We have a package name | |
if keydebs.count(keyfile) == 0: | |
keydebs.append(keyfile) | |
elif kfp.suffix is '.gpg': # We have a keyring | |
if keystores.count(keyfile) == 0: | |
keystores.append(keyfile) | |
else: | |
print("I don't know what to do with: " + keyfile) | |
except KeyError: | |
pass # We don't care if there is no key, it may be a ppa which installs keys different. | |
self.software_properties.sourceslist.save() | |
# If I don't do this twice it never sees the update (??) | |
self.cache.update(fetch_progress=apt.progress.text.AcquireProgress()) | |
self.cache.open() | |
self.cache.update(fetch_progress=apt.progress.text.AcquireProgress()) | |
self.cache.open() | |
# Get keyring packages (and any deps, but we ignore those for now) | |
if len(keydebs) > 0: | |
for keydeb in keydebs: | |
pkg = self.cache[keydeb] | |
pkg.mark_install() | |
# Only fetch, we have to do somewhat of a manual process to install in the chroot. | |
self.cache.fetch_archives(progress=apt.progress.text.AcquireProgress(), allow_unauthenticated=True) | |
# Extract keyring packages that were fetched above. | |
for pkg in keydebs: | |
foo = self.pkgcache.glob(pkg + "*.deb") | |
for item in foo: | |
print("PKG: " + str(item)) | |
df = debfile.DebPackage(str(item), self.cache) | |
if df.check(): | |
print("Check!") | |
df.install(install_progress=InstallProgress(str(self.root))) | |
# Now that the keyrings are installed we can go back to authenticated repos. | |
self.set_apt_auth() | |
# Update and clear all previously selected packages. | |
self.cache.update(fetch_progress=apt.progress.text.AcquireProgress()) | |
self.cache.open() | |
self.cache.clear() | |
self.dump_config() | |
def select_packages(self): | |
# Get requested packages. | |
packages_file = Path("packages.yaml") | |
packages = self.yaml.load(packages_file) | |
for package in packages['packages']: | |
pkg = self.cache[package] | |
pkg.mark_install() | |
cand = pkg.candidate | |
print("Deps and Suggestions for package: " + str(pkg)) | |
for deps in cand.dependencies: | |
for dep in deps.target_versions: | |
depname = dep.record['Package'] | |
if dep.priority in 'important' or dep.priority in 'required': | |
print("Got DEP: " + depname + " PRI: " + str(dep.priority)) | |
try: | |
deppkg = self.cache[depname] | |
if deppkg.is_now_broken: | |
print("EE: Pkg: " + depname + " IS BROKEN") | |
sys.exit(-1) | |
deppkg.mark_auto() | |
deppkg.mark_install(from_user=False) | |
except KeyError: | |
print("This DEP does not exist: " + depname) | |
for deps in cand.recommends: | |
for dep in deps.target_versions: | |
depname = dep.record['Package'] | |
print("Got REC: " + depname) | |
try: | |
deppkg = self.cache[depname] | |
deppkg.mark_auto() | |
deppkg.mark_install(from_user=False) | |
except KeyError: | |
print("This REC does not exist: " + depname) | |
# Check for essential packages | |
for pkgname in self.cache.keys(): | |
if self.cache[pkgname].essential: | |
pkg = self.cache[pkgname] | |
# print("ARCH: " + pkg.architecture()) | |
if pkg.architecture() not in "amd64": | |
# print("Incorrect ARCH: " + pkg.architecture()) | |
continue | |
print("Got ESS: " + pkgname) | |
pkg.mark_auto() | |
pkg.mark_install(from_user=False) | |
# Fetch all selected packages | |
self.cache.fetch_archives(progress=apt.progress.text.AcquireProgress())#, allow_unauthenticated=True) | |
def select_package_for_extraction(self, pkgfile): | |
if self.to_install.get(pkgfile, None) is None: | |
print("Selecting: " + pkgfile + " For extraction.") | |
df = debfile.DebPackage(pkgfile, self.cache) | |
self.to_install[pkgfile] = {"package": df, "file": pkgfile, "check": False } | |
if not df.check_conflicts(): | |
print("Conflicts detected in package: " + pkgfile) | |
self.conflicts[pkgfile] = [] | |
for conflict in df.conflicts: | |
self.conflicts.append(conflict) | |
print("\t\t" + str(conflict)) | |
if df.check(): | |
self.to_install[str(pkgfile)]['check'] = True | |
else: | |
print("Missing Deps: " + str(df.required_changes)) | |
(required_install, required_remove, _) = df.required_changes | |
for req in required_install: | |
pkgreq = self.cache[req] | |
if not pkgreq.marked_install: | |
pkgreq.mark_auto() | |
pkgreq.mark_install(from_user=False) | |
# self.cache.fetch_archives(progress=apt.progress.text.AcquireProgress())#, allow_unauthenticated=True) | |
files = self.pkgcache.glob(req + "*.deb") | |
if files: | |
for file in files: | |
self.select_package_for_extraction(str(file)) | |
def select_all_packages_for_extraction(self): | |
pkgfiles = self.pkgcache.glob("*.deb") | |
for pkg in pkgfiles: | |
self.select_package_for_extraction(str(pkg)) | |
def dump_config(self): | |
cfgfile = Path("apt.conf") | |
try: | |
cfgfile.unlink() | |
except FileNotFoundError: | |
pass | |
cfgfile.write_text(config.dump()) | |
def extract_selections(self): | |
# extract all the downloaded packages | |
self.toextract = [] | |
for name in self.cache.keys(): | |
pkg = self.cache[name] | |
if pkg.marked_install: | |
print("Package: " + pkg.name + " is downloaded") | |
self.toextract.append(name) | |
cfgfile = Path("apt.conf") | |
try: | |
cfgfile.unlink() | |
except FileNotFoundError: | |
pass | |
cfgfile.write_text(config.dump()) | |
# env = {"DEBIAN_FRONTEND": "noninteractive", "DEBCONF_NONINTERACTIVE_SEEN": "true", "LC_ALL": "C", "LANGUAGE": "C", "LANG": "C", "APT_CONFIG": str(cfgfile)} | |
# #print(env) | |
# args = ["apt-get", "-o Apt::Architecture=" + ARCH, | |
# "-o Apt::Architectures=amd64,i386", | |
# "-o Apt::Install-Recommends=false", | |
# "-o Dir=" + str(ROOT), | |
# "-o Dir::State=" + str(libdir), | |
# "-o Dir::State::Status=" + str(dpkgdir) + "/status", | |
# "-o Dir::Cache=" + str(cachedir), | |
# "-o Dir::Etc=" + str(etcdir), | |
# "-o Dpkg::Options::=--root="+str(ROOT), | |
# "-o Dpkg::Options::=--instdir="+str(ROOT), | |
# "-o Dpkg::Options::=--admindir="+str(dpkgdir), | |
# "-o Dpkg::Options::=--log="+str(logfile), | |
# "install", "-y"] | |
# args.extend(toextract) | |
# #print(args) | |
# ret = subprocess.run(args, env=env) | |
print("Selected for extraction: ") | |
for pkg, data in self.to_install.items(): | |
print("Pkg: " + pkg) | |
if data['check']: | |
print(ANSI_GREEN+"\tCheck: OK"+ANSI_RESET) | |
else: | |
print(ANSI_RED+"\tCheck: FAIL"+ANSI_RESET) | |
if data['package'].check(): | |
print(ANSI_GREEN+"\tReCheck: OK"+ANSI_RESET) | |
else: | |
print(ANSI_RED+"\tReCheck: FAIL") | |
(required_install, required_remove, unauthenticated) = data['package'].required_changes | |
print("\t\tINS: " + str(required_install)) | |
print("\t\tREM: " + str(required_remove)) | |
print(ANSI_RESET) | |
print("Conflicts: ") | |
for pkg, conflicts in self.conflicts.items(): | |
print("\tPkg:" + pkg) | |
print("Start extraction") | |
for pkg, data in self.to_install.items(): | |
print("Extracting: " + pkg) | |
data['package'].install(install_progress=InstallProgress(str(self.root))) | |
# Parse args, and/or use config file | |
parser = argparse.ArgumentParser(description="Multistrap replacement") | |
parser.add_argument("--root", help="Path to root of the OS image", required=True) | |
args = parser.parse_args() | |
print(args) | |
builder = DistBuilder(args.root) | |
builder.init_chroot() | |
builder.init_apt() | |
builder.setup_sources() | |
builder.select_packages() | |
builder.select_all_packages_for_extraction() | |
builder.extract_selections() | |
# After extraction we need to setup the chroot, mount /dev, /proc, /sys etc, create /bin/sh link from shell. | |
ret = builder.exec_chroot(['ln', '-sf', '/bin/bash', '/bin/sh']) | |
if ret is not 0: | |
print("EE: link to /bin/sh failed!") | |
sys.exit(-1) | |
ret = builder.exec(['mount', '--bind', '/dev', args.root +"/dev"]) | |
if ret is not 0: | |
print("EE: bind mount of /dev failed!") | |
sys.exit(-1) | |
ret = builder.exec_chroot(['mount', '-t', 'devpts', 'devpts', '/dev/pts']) | |
if ret is not 0: | |
print("EE: /dev/pts mount failed!") | |
sys.exit(-1) | |
ret = builder.exec_chroot(['mount', '-t', 'proc', 'proc', '/proc']) | |
if ret is not 0: | |
print("EE: /proc mount failed!") | |
sys.exit(-1) | |
ret = builder.exec_chroot(['mount', '-t', 'sysfs', 'sys', '/sys']) | |
if ret is not 0: | |
print("EE: /sys mount failed!") | |
sys.exit(-1) | |
builder.exec_preinst() | |
ret = builder.exec_chroot(['umount', '/proc']) | |
if ret is not 0: | |
print("EE: /proc umount failed!") | |
ret = builder.exec_chroot(['umount', '/sys']) | |
if ret is not 0: | |
print("EE: /sys umount failed!") | |
ret = builder.exec_chroot(['umount', '/dev/pts']) | |
if ret is not 0: | |
print("EE: /dev/pts umount failed!") | |
ret = builder.exec_chroot(['umount', '/dev']) | |
if ret is not 0: | |
print("EE: /dev umount failed!") | |
sys.exit(0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment