Skip to content

Instantly share code, notes, and snippets.

@ConnorNelson
Last active April 18, 2023 21:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ConnorNelson/080f5f96ad548438319d78820ad2db50 to your computer and use it in GitHub Desktop.
Save ConnorNelson/080f5f96ad548438319d78820ad2db50 to your computer and use it in GitHub Desktop.
import collections
import os
import multiprocessing
import socket
import signal
import subprocess
import pathlib
import tempfile
import time
import shutil
import ctypes
CLONE_NEWNS = 0x00020000 # New mount namespace group
CLONE_NEWCGROUP = 0x02000000 # New cgroup namespace
CLONE_NEWUTS = 0x04000000 # New utsname namespace
CLONE_NEWIPC = 0x08000000 # New ipc namespace
CLONE_NEWUSER = 0x10000000 # New user namespace
CLONE_NEWPID = 0x20000000 # New pid namespace
CLONE_NEWNET = 0x40000000 # New network namespace
PR_SET_PDEATHSIG = 1
HOST_UID_MAP_BASE = 100000
HOST_UID_MAP_PRIVILEGED_OFFSET = 0
HOST_UID_MAP_UNPRIVILEGED_OFFSET = 1
HOST_UID_MAP_STEP = 1000
PRIVILEGED_UID = 0
UNPRIVILEGED_UID = 1000
libc = ctypes.CDLL("libc.so.6")
def new_user_ns(ns_flags=0, uid_map=None):
if uid_map is None:
uid_map = f"{PRIVILEGED_UID} {os.getuid()} 1\n"
unshare_semaphore = multiprocessing.Semaphore(0)
id_map_semaphore = multiprocessing.Semaphore(0)
pid = os.fork()
if pid:
unshare_semaphore.acquire()
for path in [f"/proc/{pid}/uid_map", f"/proc/{pid}/gid_map"]:
with open(path, "w") as f:
f.write(uid_map)
id_map_semaphore.release()
else:
libc.prctl(PR_SET_PDEATHSIG, signal.SIGKILL)
libc.unshare(CLONE_NEWUSER | ns_flags)
unshare_semaphore.release()
id_map_semaphore.acquire()
return pid
def ip_run(command, *, check=True):
try:
return subprocess.run(["/sbin/ip", *command.split()],
stdin=subprocess.DEVNULL,
encoding="ascii",
capture_output=True,
check=check)
except subprocess.CalledProcessError as e:
raise Exception(e.stderr)
def iptables_load(rules):
try:
return subprocess.run(["/sbin/iptables-restore"],
input=rules,
encoding="ascii",
capture_output=True,
check=True)
except subprocess.CalledProcessError as e:
raise Exception(e.stderr)
class Host:
_next_id = 0
def __init__(self, name, *, ns_flags=None):
self.name = name
if ns_flags is None:
ns_flags = (
CLONE_NEWNS |
CLONE_NEWCGROUP |
CLONE_NEWUTS |
CLONE_NEWIPC |
CLONE_NEWPID |
CLONE_NEWNET
)
self.ns_flags = ns_flags
self.id = Host._next_id
Host._next_id += 1
self._pid = multiprocessing.Value("i", 0)
self.runtime_path = tempfile.TemporaryDirectory()
os.chown(self.runtime_path.name, self.host_privileged_id, self.host_privileged_id)
# TODO: make runtime_path deletion not be a disaster on Exception
def run(self):
started_semaphore = multiprocessing.Semaphore(0)
pid = new_user_ns(self.ns_flags, self.host_id_map)
if pid:
self._pid.value = pid
started_semaphore.acquire()
return
self.start()
started_semaphore.release()
self.entrypoint()
def start(self):
if self.ns_flags & CLONE_NEWPID:
if os.fork():
os.wait()
os._exit(0)
libc.prctl(PR_SET_PDEATHSIG, signal.SIGKILL)
os.setuid(PRIVILEGED_UID)
os.setgid(PRIVILEGED_UID)
os.setgroups([PRIVILEGED_UID])
if self.ns_flags & CLONE_NEWUTS:
socket.sethostname(self.name)
if self.ns_flags & CLONE_NEWNET:
ip_run("link set lo up")
def entrypoint(self):
while True:
time.sleep(1)
def enter(self, *, uid=PRIVILEGED_UID):
ns_names ={
CLONE_NEWUSER: "user",
CLONE_NEWNS: "mnt",
CLONE_NEWCGROUP: "cgroup",
CLONE_NEWUTS: "uts",
CLONE_NEWIPC: "ipc",
CLONE_NEWPID: "pid_for_children",
CLONE_NEWNET: "net",
}
for ns_flag, ns_name in ns_names.items():
if ns_flag & (CLONE_NEWUSER | self.ns_flags):
ns_path = f"/proc/{self.pid}/ns/{ns_name}"
assert libc.setns(os.open(ns_path, 0), 0) == 0
os.setuid(uid)
os.setgid(uid)
os.setgroups([uid])
def exec(self, fn, *, uid=PRIVILEGED_UID):
# TODO: capture fn() return value (int)
pid = os.fork()
if pid:
os.wait()
return
self.enter(uid=uid)
fn()
os._exit(0)
@property
def pid(self):
return self._pid.value
@property
def host_privileged_id(self):
return (HOST_UID_MAP_BASE +
self.id * HOST_UID_MAP_STEP +
HOST_UID_MAP_PRIVILEGED_OFFSET)
@property
def host_unprivileged_id(self):
return (HOST_UID_MAP_BASE +
self.id * HOST_UID_MAP_STEP +
HOST_UID_MAP_UNPRIVILEGED_OFFSET)
@property
def host_id_map(self):
return (
f"{PRIVILEGED_UID} {self.host_privileged_id} 1\n"
f"{UNPRIVILEGED_UID} {self.host_unprivileged_id} 1\n"
)
class Network(Host):
def __init__(self, *args, **kwargs):
hosts = kwargs.pop("hosts", [])
super().__init__(*args, **kwargs, ns_flags=CLONE_NEWNET|CLONE_NEWUTS)
self.host_ips = {}
self.host_edges = collections.defaultdict(set)
self._next_ip = 1
for host in hosts:
self.dhcp(host)
@property
def hosts(self):
return self.host_ips.keys()
def dhcp(self, host):
if host not in self.host_ips:
assert self._next_ip < 255
self.host_ips[host] = f"10.0.0.{self._next_ip}"
self._next_ip += 1
return self.host_ips[host]
def connect(self, host1, host2):
self.dhcp(host1)
self.dhcp(host2)
self.host_edges[host1].add(host2)
self.host_edges[host2].add(host1)
@property
def host_id_map(self):
uid_map_range = HOST_UID_MAP_STEP * (len(self.hosts) + 1)
return (
f"{PRIVILEGED_UID} {os.getuid()} 1\n"
f"{HOST_UID_MAP_BASE} {HOST_UID_MAP_BASE} {uid_map_range}\n"
)
def start(self):
super().start()
ip_run("link add name bridge0 type bridge")
for host, host_ip in self.host_ips.items():
ip_run(f"link add veth{host.id} type veth peer name veth{host.id}-child")
ip_run(f"link set veth{host.id} master bridge0")
host.run()
ip_run(f"link set veth{host.id} up")
ip_run(f"link set veth{host.id}-child netns {host.pid}")
# TODO: host `ip_run` before chroot
host.exec(lambda: (ip_run(f"link set veth{host.id}-child name eth0"),
ip_run(f"addr add {host_ip}/24 dev eth0"),
ip_run("link set eth0 up")))
iptables_rules = [
"*filter",
":INPUT ACCEPT [0:0]",
":FORWARD DROP [0:0]",
":OUTPUT ACCEPT [0:0]",
]
for src_host, dst_hosts in self.host_edges.items():
for dst_host in dst_hosts:
src_ip = self.host_ips[src_host]
dst_ip = self.host_ips[dst_host]
iptables_rules.append(f"-A FORWARD -s {src_ip}/32 -d {dst_ip}/32 -j ACCEPT")
iptables_rules.append("COMMIT")
iptables_rules.append("")
iptables_load("\n".join(iptables_rules))
ip_run("link set bridge0 up")
def entrypoint(self):
while True:
time.sleep(1)
class SimpleFSHost(Host):
def __init__(self, *args, **kwargs):
self.src_path = pathlib.Path(kwargs.pop("src_path"))
super().__init__(*args, **kwargs)
@property
def fs_path(self):
return pathlib.Path(self.runtime_path.name) / "fs"
def start(self):
super().start()
shutil.copytree(self.src_path, self.fs_path, symlinks=True)
for path_name in ["bin", "sbin", "usr", "root", "home", "etc", "tmp", "dev"]:
path = self.fs_path / path_name
path.mkdir(exist_ok=True)
os.chroot(self.fs_path) # TODO: seccomp away chroot
os.chdir("/")
os.mknod("/dev/null", 0o666)
os.mkdir("/home/user")
os.chown("/home/user", UNPRIVILEGED_UID, UNPRIVILEGED_UID)
with open("/etc/passwd", "w") as f:
f.write("".join(f"{user}\n" for user in [
f"root:x:{PRIVILEGED_UID}:{PRIVILEGED_UID}:root:/root:/bin/sh",
f"user:x:{UNPRIVILEGED_UID}:{UNPRIVILEGED_UID}:user:/home/user:/bin/sh",
f"nobody:x:65534:65534:nobody:/nonexistent:/usr/sbin/nologin",
]))
with open("/etc/group", "w") as f:
f.write("".join(f"{group}\n" for group in [
f"root:x:{PRIVILEGED_UID}:",
f"user:x:{UNPRIVILEGED_UID}:",
f"nobody:x:65534:",
]))
def enter(self, *args, **kwargs):
super().enter(*args, **kwargs)
os.chroot(self.fs_path) # TODO: seccomp away chroot
os.chdir("/")
def mixin_host(*hosts, **kwargs):
return type("MixinHost", hosts, kwargs)
def main():
"""
mkdir /opt/busybox
cd /opt/busybox
wget https://busybox.net/downloads/busybox-1.33.2.tar.bz2
tar xf busybox-1.33.2.tar.bz2
cd busybox-1.33.2
make defconfig
sed -e 's/.*CONFIG_STATIC .*/CONFIG_STATIC=y/' -i .config
make install
ln -s "$PWD/_install" /opt/busybox/fs
"""
src_path = pathlib.Path("/opt/busybox/fs")
hosts = [
SimpleFSHost(f"host-{i+1}", src_path=src_path)
for i in range(3)
]
network = Network("router")
network.connect(hosts[0], hosts[1])
network.connect(hosts[1], hosts[2])
network.run()
# network.exec(lambda: os.system("bash"))
def busybox_shell():
environ = {
"PATH": "/usr/sbin:/usr/bin:/sbin:/bin",
"HOME": "/root",
"PS1": r"\u@\h:\w\$ ",
}
os.execve("/bin/sh", ["sh"], environ)
host_names = {
host.name: host
for host in hosts
}
import readline
while True:
print("Hosts:")
for name in host_names:
print(f"- {name}")
try:
host_name = input("host> ")
except EOFError:
print()
break
if host_name not in host_names:
print("Invalid host")
continue
host = host_names[host_name]
host.exec(busybox_shell)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment