Skip to content

Instantly share code, notes, and snippets.

@astoeckel
Last active December 12, 2023 13:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save astoeckel/601c66f2475e7516102b0f5a67b53a1c to your computer and use it in GitHub Desktop.
Save astoeckel/601c66f2475e7516102b0f5a67b53a1c to your computer and use it in GitHub Desktop.
Python sandbox
#!/usr/bin/env python3
"""
This module provides an `eval_in_sandbox` function that, on Linux systems,
executes arbitrary Python code in a secure and lightweight sandbox with
memory, I/O, and time limits.
----
Copyright (c) 2023 Andreas Stöckel
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
import io
import os
import shutil
import subprocess
import tempfile
import time
import sys
import dataclasses
import select
@dataclasses.dataclass
class ProcessOutput:
returncode: int = 0
stdout: str = ""
stderr: str = ""
def __bool__(self):
return self.returncode == 0
class SandboxError(Exception):
pass
def eval_in_sandbox(
python_code: str,
mem_max_mb: int = 100,
cpu_max_time_sec: int = 1,
timeout_max_time_sec: int = 10,
io_max_kb: int = 128,
) -> ProcessOutput:
"""
Executes the given script in an isolated sandbox with memory, CPU, and I/O
limits. Returns the captured stdout and stderr. Throws `SandboxError` if
anything goes wrong.
:param python_code: is a string containing the Python code that should be
safely executed.
:param mem_max_mb: is the maximum size of the address space used by the
child process in mebibytes. Note that this value must be substantially
larger than the actual amount of memory used by the process, since it
includes the memory needed by the Python interpreter executable itself.
:param cpu_max_time_sec: is the maximum CPU time that can be used by the
child process. CPU time is the time during which the process is
actually active; this doesn't include sleeping, or waiting for I/O.
:param timeout_max_time_sec: is the maximum wall-clock time for which the
process may be alive.
:param io_max_kb: maximum number of kibibytes that may be written by the
process to stdout/stdin.
:return: an instance of the `ProcessOutput` structure, containing the
process return code and stdin/stdout.
"""
# Convert the given memory size limits to bytes
mem_max_bytes = 1024 * 1024 * mem_max_mb
io_max_bytes = 1024 * io_max_kb
# Search for the bubblewrap executable
bwrap_exe = shutil.which("bwrap")
if bwrap_exe is None:
raise SandboxError("Cannot find the `bwrap` executable")
# Search for the prlimit executable
prlimit_exe = shutil.which("prlimit")
if prlimit_exe is None:
raise SandboxError("Cannot find the `prlimit` executable")
# Determine the path to the Python interpreter; sometimes `sys.executable`
# returns wrong results, so we use `shutil.which` as a fallback
python_exe = sys.executable
if not os.access(python_exe, os.X_OK):
python_exe = shutil.which("python3")
if python_exe is None:
raise SandboxError("Cannot find the Python executable")
if not python_exe.startswith("/usr"):
raise SandboxError("Python interpreter not located in `/usr`")
# Securely create a temporary directory; this directory will automatically
# be deleted
with tempfile.TemporaryDirectory() as tmp_dir:
# Write the given code into the temporary directory
code_filename = os.path.join(tmp_dir, "code.py")
with open(code_filename, "w", encoding="utf-8") as f:
f.write(python_code)
f.write("\n")
# Assemble the arguments that we need to pass to `bwrap`
args = [
# Use `prlimit` to limit the CPU time and the size of the address
# space used by the child commands
prlimit_exe,
# Set the soft and hard cpu time limit
f"--cpu={cpu_max_time_sec}:{cpu_max_time_sec}",
# Set the soft and hard address space limit
f"--as={mem_max_bytes}:{mem_max_bytes}",
# Execute bwrap
bwrap_exe,
# Use an isolated terminal session for the subprcoess
"--new-session",
# Isolate all possible Linux namespaces.
"--unshare-all",
# Clear all environment variables
"--clearenv",
# Execute the script inside the "home" directory
"--chdir", "/home",
# Mount the temporary directory as home
"--ro-bind", tmp_dir, "/home",
# Make the directory containing the Python interpreter available
"--ro-bind", "/usr", "/usr",
"--ro-bind", "/lib", "/lib",
"--ro-bind", "/lib64", "/lib64",
# Lastly, make the virtual root directory read-only
"--remount-ro", "/",
# Execute the Python interpreter
python_exe,
# Disable buffering
"-u",
# Execute the script
"/home/code.py"
]
# Execute the subprocess; read from stdout/stderr while handling
# timeouts and I/O limits.
try:
# Create the subprocess; close stdin
proc = subprocess.Popen(
args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.DEVNULL
)
# Mark stdout and stderr as non-blocking; this is needed because we
# always try to read 1024-byte chunks from these streams.
os.set_blocking(proc.stdout.fileno(), False)
os.set_blocking(proc.stderr.fileno(), False)
# Get a second timestamp from a monotonic time source
t_start = time.monotonic()
# Read stdout and stderr
buf_stdout, buf_stderr = io.BytesIO(), io.BytesIO()
src_fds = [proc.stdout, proc.stderr]
tar_bufs = [buf_stdout, buf_stderr]
returncode = None
while returncode is None:
# Abort if we get a timeout
t_now = time.monotonic()
if t_now - t_start > timeout_max_time_sec:
raise SandboxError("Process timed out")
# Wait for data being available on stdout/stderr
r_fds, _, x_fds = select.select(
src_fds,
# We do not have any input stream
[],
# Wait for errors/the proc being closed
src_fds,
# Wait for at most 100ms
0.1,
)
# Read data from stdout/stderr; abort if the I/O buffers are
# too large
for src_fd, tar_buf in list(zip(src_fds, tar_bufs)):
if src_fd in r_fds:
# FD is ready for reading, try to read a bit of data
buf = src_fd.read(1024)
# Did that fail? If yes, do not bother asking select
# for that fd anymore
if not buf:
src_fds.remove(src_fd)
tar_bufs.remove(tar_buf)
continue
# Append the data we read to the target buffer
tar_buf.write(buf)
if tar_buf.tell() > io_max_bytes:
raise SandboxError("I/O buffer size exceeded")
# Discard stdout/stderr if closed by the process
for src_fd, tar_buf in list(zip(src_fds, tar_bufs)):
if src_fd in x_fds:
# Do not bother about this FD anymore
src_fds.remove(src_fd)
tar_bufs.remove(tar_buf)
# Wait for the process to exit, if stdout/stderr were closed
if not src_fds:
try:
returncode = proc.wait(0.1)
except TimeoutError:
# If we get a timeout, continue in the loop; our own
# high-level timeout code will trigger at some point.
pass
# Try to decode the stdout/stderr buffer
try:
stdout = str(buf_stdout.getvalue(), "utf-8")
stderr = str(buf_stderr.getvalue(), "utf-8")
except UnicodeDecodeError:
raise SandboxError("Subprocess did not produce valid UTF-8")
# Strip spaces away
stdout = stdout.strip()
stderr = stderr.strip()
# Return the output!
return ProcessOutput(
returncode=returncode,
stderr=stderr,
stdout=stdout,
)
except SandboxError:
# Kill the subprocess
if returncode is None:
proc.kill()
proc.wait()
# Re-raise the exception
raise
if __name__ == "__main__":
def main():
"""
Small test program.
"""
res = eval_in_sandbox("""
print('Hello World from stdout!')
import os
import json
print("My UID is", os.getuid())
print("My GID is", os.getgid())
print("My PID is", os.getpid())
print("My CWD is", os.getcwd())
print("My environment is", json.dumps(dict(**os.environ), indent=4))
try:
with open("./foo.txt", "w") as f:
f.write("EVIL\\n")
except BaseException as e:
print(f"Writing to a file failed with: {e}")
try:
os.unlink("code.py")
except BaseException as e:
print(f"Deleting a file failed with: {e}")
import sys
sys.stderr.write('Hello World from stderr!')
sys.exit(42)
""")
print(f"Stdout: {res.stdout}")
print(f"Stderr: {res.stderr}")
print(f"Returncode: {res.returncode}")
sys.exit(res.returncode)
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment