Skip to content

Instantly share code, notes, and snippets.

@bentwire
Created March 13, 2020 16:19
Show Gist options
  • Save bentwire/60b92136ca8c1369c16d72ed1ea245c2 to your computer and use it in GitHub Desktop.
Save bentwire/60b92136ca8c1369c16d72ed1ea245c2 to your computer and use it in GitHub Desktop.
libapt-pkg in chroot...
#!/usr/bin/python3
# This is an attempt to build a python work-alike of multistrap.
# It currently lacks many features
# Dependency selection ... is crap. Needs proper version selection
import apt
import os
import sys
import subprocess
import argparse
import pathlib
import aptsources
from aptsources.sourceslist import SourceEntry
from pathlib import Path
from softwareproperties.SoftwareProperties import SoftwareProperties, shortcut_handler
from softwareproperties.shortcuts import ShortcutException
from apt import debfile
from apt_pkg import config, init_system, PackageManager, DepCache
from ruamel.yaml import YAML
from time import sleep
ANSI_GREEN="\u001b[32;1m"
ANSI_RED="\u001b[31;1m"
ANSI_RESET="\u001b[0m"
class InstallProgress(apt.progress.base.InstallProgress):
def __init__(self, root):
super(InstallProgress, self).__init__()
self.root = root
def conffile(self, current, new):
print("CONFFILE: " + current + " " + new)
def error(self, pkg, errmsg):
print("ERROR: " + pkg + " :" + errmsg)
def processing(self, pkg, stage):
print("PROCESSING: " + pkg + " :" + stage)
def dpkg_status_change(self, pkg, status):
print("DPKG STATUS CHANGE: " + pkg + " :" + status)
def status_change(self, pkg, pct, status):
print("STATUS CHANGE: " + pkg + " :" + pct + " :" +status)
def start_update(self):
pass #print("START UPDATE")
def finish_update(self):
pass #print("FINISH UPDATE")
def run(self, obj): # Override this to make sure we can pass the correct flags to dpkg
print("RUN: " + obj)
"""Install using the object 'obj'.
This functions runs install actions. The parameter 'obj' may either
be a PackageManager object in which case its do_install() method is
called or the path to a deb file.
If the object is a PackageManager, the functions returns the result
of calling its do_install() method. Otherwise, the function returns
the exit status of dpkg. In both cases, 0 means that there were no
problems.
"""
env = {"DEBIAN_FRONTEND": "noninteractive", "DEBCONF_NONINTERACTIVE_SEEN": "true", "LC_ALL": "C", "LANGUAGE": "C", "LANG": "C", "PATH": "/:/bin:/sbin:/usr/bin:/usr/sbin"}
args = ["/usr/bin/dpkg", "--debug=002", "--force-all", "--root="+self.root, "--admindir="+self.root+"/var/lib/dpkg", "--unpack", obj]
#args = ["/usr/bin/dpkg", "-X", obj, self.root]
#ret = subprocess.run(args, stdout=self.write_stream.fileno(), stderr=self.write_stream.fileno())
ret = subprocess.run(args, env=env)
#print(ret.stdout)
if ret.returncode > 0:
print(ANSI_RED + "Failed to extract: " + obj)
print("\tRET: " + str(ret.returncode)+ANSI_RESET)
return ret.returncode
# After the extraction we need to manipulate some files so that apt and friends think the package is extracted and ready to be configured
# check starting at line 646 in multistrap for an example of how to do this.
# args = ["/usr/bin/dpkg", "--root="+self.root, "-A", obj]
# ret = subprocess.run(args, env=env)
#
# #print(ret.stdout)
# if ret.returncode > 0:
# print(ANSI_RED + "Failed to mark: " + obj)
# print("\tRET: " + str(ret.returncode)+ANSI_RESET)
# return ret.returncode
return ret.returncode
class DistBuilder(object):
# Various apt/dpkg related constants
CACHEDIR = "var/cache/apt/" # archives
LIBDIR = "var/lib/apt/" # lists
LOGDIR = "var/log/" # logs
ETCDIR = "etc/apt/" # sources
DPKGDIR = "var/lib/dpkg/" # state
def __init__(self, root):
self.root = pathlib.PurePath(root + "/")
self.cachedir = pathlib.PurePath(self.root, self.CACHEDIR)
self.libdir = pathlib.PurePath(self.root, self.LIBDIR)
self.logdir = pathlib.PurePath(self.root, self.LOGDIR)
self.etcdir = pathlib.PurePath(self.root, self.ETCDIR)
self.dpkgdir = pathlib.PurePath(self.root, self.DPKGDIR)
# Initialize cache
self.cache = apt.Cache(rootdir=str(self.root))
self.yaml = YAML()
self.to_install = {}
self.conflicts = {}
def exec(self, args=[]):
ret = subprocess.run(args)
return ret.returncode
def exec_chroot(self, cargs=[], env=None):
args = ["chroot", str(self.root)]
for arg in cargs:
args.append(arg)
print(args)
ret = subprocess.run(args, env=env)
return ret.returncode
def exec_preinst(self):
env = {"DEBIAN_FRONTEND": "noninteractive", "DEBCONF_NONINTERACTIVE_SEEN": "true", "LC_ALL": "C", "LANGUAGE": "C", "LANG": "C", "PATH": "/bin:/sbin:/usr/bin:/usr/sbin"}
preinst_scripts = Path(self.dpkgdir, "info/").glob("*.preinst")
print("PRE:")
for script in preinst_scripts:
relscript = "/var/lib/dpkg/info/" + script.name
if "bin/sh" in script.read_text(encoding='iso-8859-1', errors='ignore'):
print("\tRUN: " + str(relscript))
self.exec_chroot(["/bin/bash", str(relscript), "install"], env=env)
self.exec_chroot(["dpkg", "--configure", "-a"], env=env)
def init_chroot(self):
# Initialize and create dirs and files required in chroot.
Path(self.cachedir).mkdir(parents=True, exist_ok=True)
Path(self.libdir, "lists/").mkdir(parents=True, exist_ok=True)
Path(self.logdir).mkdir(parents=True, exist_ok=True)
Path(self.etcdir).mkdir(parents=True, exist_ok=True)
Path(self.dpkgdir).mkdir(parents=True, exist_ok=True)
self.sourcesparts = Path(self.root, "etc/apt/sources.list.d/").mkdir(parents=True, exist_ok=True)
self.trustedparts = Path(self.root, "etc/apt/trusted.gpg.d/").mkdir(parents=True, exist_ok=True)
self.confparts = Path(self.root, "etc/apt/apt.conf.d/").mkdir(parents=True, exist_ok=True)
self.preferencesparts = Path(self.root, "etc/apt/preferences.d/").mkdir(parents=True, exist_ok=True)
Path(self.root, "etc/network/").mkdir(parents=True, exist_ok=True)
Path(self.root, "usr/share/info/").mkdir(parents=True, exist_ok=True)
Path(self.root, "usr/bin/").mkdir(parents=True, exist_ok=True)
Path(self.root, "dev/").mkdir(parents=True, exist_ok=True)
Path(self.root, "proc/").mkdir(parents=True, exist_ok=True)
Path(self.root, "sys/").mkdir(parents=True, exist_ok=True)
Path(self.root, "tmp/").mkdir(parents=True, exist_ok=True)
Path(self.root, "var/log/").mkdir(parents=True, exist_ok=True)
Path(self.dpkgdir, "updates/").mkdir(parents=True, exist_ok=True)
Path(self.dpkgdir, "info/").mkdir(parents=True, exist_ok=True)
Path(self.dpkgdir, "tmp.ci/").mkdir(parents=True, exist_ok=True)
self.available_file = Path(self.dpkgdir, "available")
self.status_file = Path(self.dpkgdir, "status")
self.diversions_file = Path(self.dpkgdir, "diversions")
self.statoverride_file = Path(self.dpkgdir, "statoverride")
self.arch_file = Path(self.dpkgdir, "arch")
self.pkgcache = Path(self.root, "var/cache/apt/archives/")
self.sources = Path(self.etcdir, "sources.list")
self.logfile = Path(self.logdir, "dpkg.log")
self.logfile.touch()
self.available_file.touch()
self.arch_file.touch()
self.statoverride_file.touch()
self.diversions_file.touch()
self.status_file.touch()
def init_apt(self):
os.environ["DEBIAN_FRONTEND"] = "noninteractive"
os.environ["DEBCONF_NONINTERACTIVE_SEEN"] = "true"
# Create apt config, config is from the apt package.
config.set("Apt::Architecture", "amd64") # This package will ONLY install amd64 + i386 multi-arch.
config.set("Apt::Architectures", "amd64,i386")
config.set("Apt::Install-Recommends", "false")
config.set("Apt::Get::Download-Only", "true")
#config.set("Dir::Log", str(logdir))
#config.set("Dir::Etc", str(etcdir))
#config.set("Dir::Etc::TrustedParts", str(trustedparts))
#config.set("Dir::Etc::SourceList", str(sources))
config.set("Dir::Bin::dpkg", "/usr/bin/dpkg")
#config.set("Dpkg::Options::", "--dry-run")
config.set("Dpkg::Options::", "--root="+str(self.root))
config.set("Dpkg::Options::", "--instdir="+str(self.root))
config.set("Dpkg::Options::", "--admindir="+str(self.dpkgdir))
config.set("Dpkg::Options::", "--log="+str(self.logfile))
#config.clear("DPkg::Pre-Install-Pkgs")
#print("DUMP CONFIG START")
#print(config.dump())
#print("DUMP CONFIG END")
init_system() # Force config reread
self.cache.open()
self.software_properties = SoftwareProperties(rootdir=str(self.root)) # Allow parsing and creation of sources.list files
#distro = aptsources.distro.get_distro()
#distro.get_sources(sp.sourceslist)
def set_apt_unauth(self):
config.set("Apt::Get::AllowUnauthenticated", "true")
config.set("Acquire::AllowInsecureRepositories", "1")
init_system() # Force config reread
self.cache.open()
def set_apt_auth(self):
config.set("Apt::Get::AllowUnauthenticated", "false")
config.set("Acquire::AllowInsecureRepositories", "0")
init_system() # Force config reread
self.cache.open()
def setup_sources(self):
# This part must be done unauthenticated
self.set_apt_unauth()
# Get sources and add
keydebs = []
keystores = []
sources_file = Path("sources.yaml")
sources = self.yaml.load(sources_file)
for source in sources['sources']:
print("Source: " + source['shortcut'])
shortcut = shortcut_handler(source['shortcut'])
shortcut.add_key()
self.software_properties.add_source_from_shortcut(shortcut, True)
try:
keyfile = source['key']
kfp = Path(keyfile)
# print(kfp)
if kfp.suffix is '': # We have a package name
if keydebs.count(keyfile) == 0:
keydebs.append(keyfile)
elif kfp.suffix is '.gpg': # We have a keyring
if keystores.count(keyfile) == 0:
keystores.append(keyfile)
else:
print("I don't know what to do with: " + keyfile)
except KeyError:
pass # We don't care if there is no key, it may be a ppa which installs keys different.
self.software_properties.sourceslist.save()
# If I don't do this twice it never sees the update (??)
self.cache.update(fetch_progress=apt.progress.text.AcquireProgress())
self.cache.open()
self.cache.update(fetch_progress=apt.progress.text.AcquireProgress())
self.cache.open()
# Get keyring packages (and any deps, but we ignore those for now)
if len(keydebs) > 0:
for keydeb in keydebs:
pkg = self.cache[keydeb]
pkg.mark_install()
# Only fetch, we have to do somewhat of a manual process to install in the chroot.
self.cache.fetch_archives(progress=apt.progress.text.AcquireProgress(), allow_unauthenticated=True)
# Extract keyring packages that were fetched above.
for pkg in keydebs:
foo = self.pkgcache.glob(pkg + "*.deb")
for item in foo:
print("PKG: " + str(item))
df = debfile.DebPackage(str(item), self.cache)
if df.check():
print("Check!")
df.install(install_progress=InstallProgress(str(self.root)))
# Now that the keyrings are installed we can go back to authenticated repos.
self.set_apt_auth()
# Update and clear all previously selected packages.
self.cache.update(fetch_progress=apt.progress.text.AcquireProgress())
self.cache.open()
self.cache.clear()
self.dump_config()
def select_packages(self):
# Get requested packages.
packages_file = Path("packages.yaml")
packages = self.yaml.load(packages_file)
for package in packages['packages']:
pkg = self.cache[package]
pkg.mark_install()
cand = pkg.candidate
print("Deps and Suggestions for package: " + str(pkg))
for deps in cand.dependencies:
for dep in deps.target_versions:
depname = dep.record['Package']
if dep.priority in 'important' or dep.priority in 'required':
print("Got DEP: " + depname + " PRI: " + str(dep.priority))
try:
deppkg = self.cache[depname]
if deppkg.is_now_broken:
print("EE: Pkg: " + depname + " IS BROKEN")
sys.exit(-1)
deppkg.mark_auto()
deppkg.mark_install(from_user=False)
except KeyError:
print("This DEP does not exist: " + depname)
for deps in cand.recommends:
for dep in deps.target_versions:
depname = dep.record['Package']
print("Got REC: " + depname)
try:
deppkg = self.cache[depname]
deppkg.mark_auto()
deppkg.mark_install(from_user=False)
except KeyError:
print("This REC does not exist: " + depname)
# Check for essential packages
for pkgname in self.cache.keys():
if self.cache[pkgname].essential:
pkg = self.cache[pkgname]
# print("ARCH: " + pkg.architecture())
if pkg.architecture() not in "amd64":
# print("Incorrect ARCH: " + pkg.architecture())
continue
print("Got ESS: " + pkgname)
pkg.mark_auto()
pkg.mark_install(from_user=False)
# Fetch all selected packages
self.cache.fetch_archives(progress=apt.progress.text.AcquireProgress())#, allow_unauthenticated=True)
def select_package_for_extraction(self, pkgfile):
if self.to_install.get(pkgfile, None) is None:
print("Selecting: " + pkgfile + " For extraction.")
df = debfile.DebPackage(pkgfile, self.cache)
self.to_install[pkgfile] = {"package": df, "file": pkgfile, "check": False }
if not df.check_conflicts():
print("Conflicts detected in package: " + pkgfile)
self.conflicts[pkgfile] = []
for conflict in df.conflicts:
self.conflicts.append(conflict)
print("\t\t" + str(conflict))
if df.check():
self.to_install[str(pkgfile)]['check'] = True
else:
print("Missing Deps: " + str(df.required_changes))
(required_install, required_remove, _) = df.required_changes
for req in required_install:
pkgreq = self.cache[req]
if not pkgreq.marked_install:
pkgreq.mark_auto()
pkgreq.mark_install(from_user=False)
# self.cache.fetch_archives(progress=apt.progress.text.AcquireProgress())#, allow_unauthenticated=True)
files = self.pkgcache.glob(req + "*.deb")
if files:
for file in files:
self.select_package_for_extraction(str(file))
def select_all_packages_for_extraction(self):
pkgfiles = self.pkgcache.glob("*.deb")
for pkg in pkgfiles:
self.select_package_for_extraction(str(pkg))
def dump_config(self):
cfgfile = Path("apt.conf")
try:
cfgfile.unlink()
except FileNotFoundError:
pass
cfgfile.write_text(config.dump())
def extract_selections(self):
# extract all the downloaded packages
self.toextract = []
for name in self.cache.keys():
pkg = self.cache[name]
if pkg.marked_install:
print("Package: " + pkg.name + " is downloaded")
self.toextract.append(name)
cfgfile = Path("apt.conf")
try:
cfgfile.unlink()
except FileNotFoundError:
pass
cfgfile.write_text(config.dump())
# env = {"DEBIAN_FRONTEND": "noninteractive", "DEBCONF_NONINTERACTIVE_SEEN": "true", "LC_ALL": "C", "LANGUAGE": "C", "LANG": "C", "APT_CONFIG": str(cfgfile)}
# #print(env)
# args = ["apt-get", "-o Apt::Architecture=" + ARCH,
# "-o Apt::Architectures=amd64,i386",
# "-o Apt::Install-Recommends=false",
# "-o Dir=" + str(ROOT),
# "-o Dir::State=" + str(libdir),
# "-o Dir::State::Status=" + str(dpkgdir) + "/status",
# "-o Dir::Cache=" + str(cachedir),
# "-o Dir::Etc=" + str(etcdir),
# "-o Dpkg::Options::=--root="+str(ROOT),
# "-o Dpkg::Options::=--instdir="+str(ROOT),
# "-o Dpkg::Options::=--admindir="+str(dpkgdir),
# "-o Dpkg::Options::=--log="+str(logfile),
# "install", "-y"]
# args.extend(toextract)
# #print(args)
# ret = subprocess.run(args, env=env)
print("Selected for extraction: ")
for pkg, data in self.to_install.items():
print("Pkg: " + pkg)
if data['check']:
print(ANSI_GREEN+"\tCheck: OK"+ANSI_RESET)
else:
print(ANSI_RED+"\tCheck: FAIL"+ANSI_RESET)
if data['package'].check():
print(ANSI_GREEN+"\tReCheck: OK"+ANSI_RESET)
else:
print(ANSI_RED+"\tReCheck: FAIL")
(required_install, required_remove, unauthenticated) = data['package'].required_changes
print("\t\tINS: " + str(required_install))
print("\t\tREM: " + str(required_remove))
print(ANSI_RESET)
print("Conflicts: ")
for pkg, conflicts in self.conflicts.items():
print("\tPkg:" + pkg)
print("Start extraction")
for pkg, data in self.to_install.items():
print("Extracting: " + pkg)
data['package'].install(install_progress=InstallProgress(str(self.root)))
# Parse args, and/or use config file
parser = argparse.ArgumentParser(description="Multistrap replacement")
parser.add_argument("--root", help="Path to root of the OS image", required=True)
args = parser.parse_args()
print(args)
builder = DistBuilder(args.root)
builder.init_chroot()
builder.init_apt()
builder.setup_sources()
builder.select_packages()
builder.select_all_packages_for_extraction()
builder.extract_selections()
# After extraction we need to setup the chroot, mount /dev, /proc, /sys etc, create /bin/sh link from shell.
ret = builder.exec_chroot(['ln', '-sf', '/bin/bash', '/bin/sh'])
if ret is not 0:
print("EE: link to /bin/sh failed!")
sys.exit(-1)
ret = builder.exec(['mount', '--bind', '/dev', args.root +"/dev"])
if ret is not 0:
print("EE: bind mount of /dev failed!")
sys.exit(-1)
ret = builder.exec_chroot(['mount', '-t', 'devpts', 'devpts', '/dev/pts'])
if ret is not 0:
print("EE: /dev/pts mount failed!")
sys.exit(-1)
ret = builder.exec_chroot(['mount', '-t', 'proc', 'proc', '/proc'])
if ret is not 0:
print("EE: /proc mount failed!")
sys.exit(-1)
ret = builder.exec_chroot(['mount', '-t', 'sysfs', 'sys', '/sys'])
if ret is not 0:
print("EE: /sys mount failed!")
sys.exit(-1)
builder.exec_preinst()
ret = builder.exec_chroot(['umount', '/proc'])
if ret is not 0:
print("EE: /proc umount failed!")
ret = builder.exec_chroot(['umount', '/sys'])
if ret is not 0:
print("EE: /sys umount failed!")
ret = builder.exec_chroot(['umount', '/dev/pts'])
if ret is not 0:
print("EE: /dev/pts umount failed!")
ret = builder.exec_chroot(['umount', '/dev'])
if ret is not 0:
print("EE: /dev umount failed!")
sys.exit(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment