Created
May 3, 2017 11:10
-
-
Save cryzed/1fff0a41309f4555efbb48cf28cd7b65 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import argparse | |
import collections | |
import itertools | |
import os | |
import re | |
import shutil | |
import stat | |
import subprocess | |
import sys | |
import xmlrpc.client | |
import xmlrpc.server | |
import appdirs | |
SUCCESS_EXIT_CODE = 0 | |
ERROR_EXIT_CODE = 1 | |
ENCODING = 'UTF-8' | |
FIREJAIL_DEFAULT_PROFILE_PATH = '/etc/firejail/default.profile' | |
# Instructions with relative path options that are not needed in a private home | |
FIREJAIL_PATH_INSTRUCTIONS = ['blacklist', 'blacklist-nolog', 'read-only'] | |
USER_CONFIG_DIR = appdirs.user_config_dir() | |
USER_HOME_PATH = os.path.expanduser('~') | |
FIREJAIL_USER_PROFILES_PATH = os.path.join(USER_CONFIG_DIR, 'firejail') | |
GLASSBOX_PROFILE_PATH = os.path.join(FIREJAIL_USER_PROFILES_PATH, '_glassbox.profile') | |
GLASSBOX_HOMES_PATH = os.getenv('GLASSBOX_HOMES_PATH', '/mnt/firejail/') | |
GLASSBOX_DEFAULT_HOME_PATH = os.path.join(GLASSBOX_HOMES_PATH, '_default') | |
GLASSBOX_EXECUTORS_PATH = os.getenv('GLASSBOX_EXECUTORS_PATH', '/usr/local/bin') | |
# TODO: Stop using private folder, instead use --whitelist options | |
GLASSBOX_EXECUTOR_TEMPLATE = '''#!/bin/sh | |
firejail --name={name} --private={private} {path} "$@" | |
''' | |
GLASSBOX_PROFILE_TEMPLATE = '''#include ~/.config/firejail/_whitelist-desktop.profile | |
#include ~/.config/firejail/_whitelist-documents.profile | |
#include ~/.config/firejail/_whitelist-downloads.profile | |
#include ~/.config/firejail/_whitelist-music.profile | |
#include ~/.config/firejail/_whitelist-pictures.profile | |
#include ~/.config/firejail/_whitelist-videos.profile | |
include {glassbox_profile_path} | |
'''.format(glassbox_profile_path=GLASSBOX_PROFILE_PATH) | |
GLASSBOX_DEFAULT_HOME_FILE_PATHS = [ | |
# GTK 2 | |
os.path.join(USER_HOME_PATH, '.gtkrc-2.0'), | |
# GTK 3 | |
os.path.join(USER_CONFIG_DIR, 'gtk-3.0', 'settings.ini'), | |
# Qt 4 | |
os.path.join(USER_CONFIG_DIR, 'Trolltech.conf'), | |
# KDE 4 | |
os.path.join(USER_HOME_PATH, '.kde4', 'share', 'kdeglobals'), | |
# KDE Plasma 5 | |
os.path.join(USER_CONFIG_DIR, 'kdeglobals'), | |
# fontconfig | |
os.path.join(USER_CONFIG_DIR, 'fontconfig', 'fonts.conf') | |
] | |
# PID has to be 2 because it is the first process started in the virtual environment after /bin/sh | |
FIREJAIL_TRACE_LINE_REGEX = re.compile(r'^2:(?P<name>.+?):(?P<function>.+?)\s(?P<arguments>.+?)$') | |
argument_parser = argparse.ArgumentParser() | |
sub_parsers = argument_parser.add_subparsers(dest='command') | |
sub_parsers.required = True | |
create_parser = sub_parsers.add_parser('create') | |
create_parser.add_argument('path') | |
create_parser.add_argument('--executors-path', default=GLASSBOX_EXECUTORS_PATH) | |
create_parser.add_argument('--homes-path', default=GLASSBOX_HOMES_PATH) | |
create_parser.add_argument('--force', action='store_true') | |
remove_parser = sub_parsers.add_parser('remove') | |
remove_parser.add_argument('name') | |
update_parser = sub_parsers.add_parser('update') | |
update_parser.add_argument('--homes', action='store_true') | |
update_parser.add_argument('--default-home', action='store_true') | |
update_parser.add_argument('--default-profile', action='store_true') | |
update_parser.add_argument('--all', action='store_true') | |
daemon_parser = sub_parsers.add_parser('daemon') | |
daemon_parser.add_argument('--port', type=int, default=8084) | |
daemon_parser.add_argument('--whitelist', nargs='+', default=[]) | |
execute_parser = sub_parsers.add_parser('execute') | |
execute_parser.add_argument('command', nargs='*') | |
execute_parser.add_argument('--port', type=int, default=8084) | |
whitelist_skeleton_parser = sub_parsers.add_parser('whitelist-skeleton') | |
whitelist_skeleton_parser.add_argument('path') | |
class WhitelistError(Exception): | |
pass | |
class FirejailProfile: | |
_Instruction = collections.namedtuple('Instruction', ['key', 'value', 'raw']) | |
def __init__(self): | |
self.instructions = [] | |
self.path = None | |
@classmethod | |
def from_path(cls, path, encoding=ENCODING): | |
with open(path, encoding=ENCODING) as file: | |
lines = file.readlines() | |
profile = cls() | |
profile.extend(lines) | |
profile.path = path | |
return profile | |
def append(self, line): | |
# Normalize newline characters | |
line = line.rstrip('\r\n') | |
# If the line only consists of whitespace don't do any splitting | |
if not line.strip(): | |
instruction = self._Instruction('', '', line) | |
self.instructions.append(instruction) | |
return | |
# Split line at most once, into a key-value pair. If the line is an instruction without arguments, value will be | |
# an empty list. | |
key, *value = line.split(None, 1) | |
value = value[0] if value else '' | |
# Special handling for comments | |
if key.startswith('#'): | |
instruction = self._Instruction('#', value, line) | |
else: | |
instruction = self._Instruction(key, value, line) | |
self.instructions.append(instruction) | |
def extend(self, lines): | |
for line in lines: | |
self.append(line) | |
def __getitem__(self, key): | |
return [instruction.value for instruction in self.instructions if instruction.key == key] | |
def _delete_key(self, key): | |
indices = [] | |
for index, instruction in enumerate(self.instructions): | |
if instruction.key == key: | |
indices.append(index) | |
# Start deleting from the back to preserve correct indices without keeping track of an offset | |
for index in reversed(indices): | |
del self.instructions[index] | |
# Accept instruction key or line index | |
def __delitem__(self, key): | |
if isinstance(key, str): | |
self._delete_key(key) | |
else: | |
del self.instructions[key] | |
def __iter__(self): | |
return iter(instruction.raw for instruction in self.instructions) | |
def __str__(self): | |
return '\n'.join([instruction.raw for instruction in self.instructions]) | |
def __repr__(self): | |
return f'<FirejailProfile {self.path!r}>' | |
def __len__(self): | |
return len(self.instructions) | |
def create_command(arguments): | |
name = os.path.basename(arguments.path) | |
# Create home | |
home_path = os.path.join(GLASSBOX_HOMES_PATH, name) | |
if os.path.exists(home_path): | |
if not arguments.force: | |
print(f'Glassbox {home_path} already exists. Use --force to overwrite.', file=sys.stderr) | |
return ERROR_EXIT_CODE | |
print(f'- Removing old Glassbox home {home_path}...') | |
shutil.rmtree(home_path) | |
if not os.path.exists(GLASSBOX_DEFAULT_HOME_PATH): | |
print('- Creating default home...') | |
# TODO: Better decoupling | |
arguments = update_parser.parse_args(['--default-home']) | |
update_command(arguments) | |
print(f'- Copying default home {GLASSBOX_DEFAULT_HOME_PATH} to {home_path}...') | |
shutil.copytree(GLASSBOX_DEFAULT_HOME_PATH, home_path) | |
# Create profile | |
path = os.path.join(FIREJAIL_USER_PROFILES_PATH, f'{name}.profile') | |
print(f'- Creating profile {path}') | |
with open(path, 'w', encoding=ENCODING) as file: | |
file.write(GLASSBOX_PROFILE_TEMPLATE) | |
# Create executor | |
executor_path = os.path.join(arguments.executors_path, name) | |
print(f'- Creating executor {executor_path}') | |
executor_code = GLASSBOX_EXECUTOR_TEMPLATE.format(name=name, private=home_path, path=arguments.path) | |
with open(executor_path, 'w', encoding=ENCODING) as file: | |
file.write(executor_code) | |
# Make executor executable | |
stat_ = os.stat(executor_path) | |
os.chmod(executor_path, stat_.st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH) | |
def remove_command(arguments): | |
home_path = os.path.join(GLASSBOX_HOMES_PATH, arguments.name) | |
profile_path = os.path.join(FIREJAIL_USER_PROFILES_PATH, arguments.name) | |
executor_path = os.path.join(GLASSBOX_EXECUTORS_PATH, arguments.name) | |
if os.path.exists(home_path): | |
print(f'- Deleting home {home_path}') | |
shutil.rmtree(home_path) | |
if os.path.exists(profile_path): | |
print(f'- Deleting profile {profile_path}') | |
os.remove(profile_path) | |
if os.path.exists(executor_path): | |
print(f'- Deleting executor {executor_path}') | |
os.remove(executor_path) | |
def load_firejail_profile_hierarchy(profile): | |
profiles = [profile] | |
includes = profile['include'] | |
while includes: | |
path = includes.pop(0) | |
if not os.path.exists(path): | |
continue | |
profile = FirejailProfile.from_path(path) | |
profiles.append(profile) | |
includes.extend(profile['include']) | |
return profiles | |
def get_glassbox_profile(): | |
default_profile = FirejailProfile.from_path(FIREJAIL_DEFAULT_PROFILE_PATH) | |
profiles = load_firejail_profile_hierarchy(default_profile) | |
glassbox_profile = FirejailProfile() | |
glassbox_profile.append(f'# Glassbox profile') | |
glassbox_profile.append('') | |
glassbox_profile.append('# Included by Glassbox') | |
glassbox_profile.append('blacklist /mnt') | |
glassbox_profile.append('blacklist /run/media/user') | |
glassbox_profile.append('include /etc/firejail/whitelist-common.inc') | |
for profile in profiles: | |
glassbox_profile.append('') | |
glassbox_profile.append(f'# Included from: {profile.path}') | |
# Remove all includes, comments and empty lines | |
del profile['include'] | |
del profile['#'] | |
del profile[''] | |
# Remove all (no-)blacklist/read-only instructions with relative paths | |
indices = [] | |
for index, instruction in enumerate(profile.instructions): | |
if instruction.key in FIREJAIL_PATH_INSTRUCTIONS and not instruction.value.startswith('/'): | |
indices.append(index) | |
for index in reversed(indices): | |
del profile[index] | |
glassbox_profile.extend(profile) | |
# Add trailing newline | |
glassbox_profile.append('') | |
return glassbox_profile | |
def find_paths(path, predicate=lambda path: True, recursive=True, follow_symlinks=False): | |
for root, directories, filenames in os.walk(path, followlinks=follow_symlinks): | |
for name in itertools.chain([root], directories, filenames): | |
path = os.path.join(root, name) | |
if predicate(path): | |
yield path | |
if not recursive: | |
break | |
def update_command(arguments): | |
if arguments.default_profile or arguments.all: | |
print(f'- Creating Glassbox profile...') | |
profile = get_glassbox_profile() | |
os.makedirs(os.path.dirname(GLASSBOX_PROFILE_PATH), exist_ok=True) | |
with open(GLASSBOX_PROFILE_PATH, 'w', encoding=ENCODING) as file: | |
file.write(str(profile)) | |
print(f'- Glassbox profile written to {GLASSBOX_PROFILE_PATH}.') | |
if arguments.default_home or arguments.all: | |
user_home_path = os.path.expanduser('~') | |
for path in GLASSBOX_DEFAULT_HOME_FILE_PATHS: | |
if not os.path.exists(path): | |
continue | |
# Make path relative to the user's home directory | |
relative_path = os.path.relpath(path, user_home_path) | |
print(f'- Updating default home {relative_path}...') | |
destination_path = os.path.join(GLASSBOX_DEFAULT_HOME_PATH, relative_path) | |
# TODO: Preserve original rights of created directories using shutil.copystat | |
os.makedirs(os.path.dirname(destination_path), exist_ok=True) | |
# Symlinks should not be followed, i.e. resolved. They should be copied and only resolved at runtime. | |
try: | |
shutil.copy2(path, destination_path, follow_symlinks=False) | |
# This can occur when one tries to overwrite a symbolic link with an identical symbolic link | |
except shutil.SameFileError: | |
pass | |
print('- Updated default home.') | |
if arguments.homes or arguments.all: | |
glassbox_paths = set(find_paths(GLASSBOX_HOMES_PATH, os.path.isdir, recursive=False)) | |
# Remove the root path and the default home path in case it is located in the same directory | |
glassbox_paths -= {GLASSBOX_DEFAULT_HOME_PATH, GLASSBOX_HOMES_PATH} | |
# Relative paths to the files contained in the default home | |
relative_paths = [os.path.relpath(path, GLASSBOX_DEFAULT_HOME_PATH) for path in | |
find_paths(GLASSBOX_DEFAULT_HOME_PATH, os.path.isfile)] | |
for glassbox_path in sorted(glassbox_paths): | |
print(f'- Updating {glassbox_path}...') | |
for relative_path in relative_paths: | |
source_path = os.path.join(GLASSBOX_DEFAULT_HOME_PATH, relative_path) | |
destination_path = os.path.join(glassbox_path, relative_path) | |
# TODO: Preserve original rights of created directories using shutil.copystat | |
os.makedirs(os.path.dirname(destination_path), exist_ok=True) | |
# Symlinks should not be followed, i.e. resolved. They should be copied and only resolved at runtime. | |
try: | |
shutil.copy2(source_path, destination_path, follow_symlinks=False) | |
# This can occur when one tries to overwrite a symbolic link with an identical symbolic link | |
except shutil.SameFileError: | |
pass | |
print('- Updated all homes.') | |
def daemon_execute(command, whitelist): | |
path = command[0] | |
if path not in whitelist: | |
raise WhitelistError(f'{path} is not whitelisted') | |
subprocess.Popen(command, stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL) | |
return SUCCESS_EXIT_CODE | |
def daemon_command(arguments): | |
# Only allow daemon to run on localhost | |
server = xmlrpc.server.SimpleXMLRPCServer(('127.0.0.1', arguments.port), allow_none=True, logRequests=False) | |
server.register_function(lambda command: daemon_execute(command, arguments.whitelist), 'execute') | |
server.serve_forever() | |
return SUCCESS_EXIT_CODE | |
def execute_command(arguments): | |
client = xmlrpc.client.ServerProxy(f'http://127.0.0.1:{arguments.port}') | |
try: | |
client.execute(arguments.command) | |
except ConnectionRefusedError: | |
print(f'Connection refused on port {arguments.port}.', file=sys.stderr) | |
return ERROR_EXIT_CODE | |
except xmlrpc.client.Fault as exception: | |
print(exception.faultString, file=sys.stderr) | |
return ERROR_EXIT_CODE | |
return SUCCESS_EXIT_CODE | |
def parse_trace_line(line): | |
match = FIREJAIL_TRACE_LINE_REGEX.match(line) | |
if not match: | |
return | |
data = match.groupdict() | |
return data['function'], data['arguments'] | |
def whitelist_skeleton_command(arguments): | |
command = ['firejail', '--noprofile', '--trace', arguments.path] | |
process = subprocess.Popen(command, stdout=subprocess.PIPE, universal_newlines=True) | |
process.wait() | |
home_path = os.path.expanduser('~') | |
accessed = collections.defaultdict(set) | |
for index, line in enumerate(process.stdout): | |
# Skip strange first line: '\x1b]0;firejail {path} \x07' | |
if index == 0: | |
line = line.split('\x07', 1)[1] | |
data = parse_trace_line(line) | |
if data is None: | |
continue | |
function, arguments = data | |
if function in ('mkdir', 'fopen', 'open64', 'access', 'unlink') and arguments.startswith(home_path): | |
path = arguments.split(':', 1)[0] | |
accessed[path].add(function) | |
for path, functions in accessed.items(): | |
print(f'{", ".join(functions)}: {path}') | |
create_parser.set_defaults(function=create_command) | |
remove_parser.set_defaults(function=remove_command) | |
update_parser.set_defaults(function=update_command) | |
daemon_parser.set_defaults(function=daemon_command) | |
execute_parser.set_defaults(function=execute_command) | |
whitelist_skeleton_parser.set_defaults(function=whitelist_skeleton_command) | |
def main(): | |
arguments = argument_parser.parse_args() | |
argument_parser.exit(arguments.function(arguments)) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment