Created
August 24, 2020 11:38
-
-
Save tomers/a5cd0dd6efcf0fe786cbdbbc4bd0dc84 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse | |
import base64 | |
import json | |
import logging | |
import os | |
import random | |
import subprocess | |
import sys | |
from tempfile import NamedTemporaryFile | |
from cryptography import x509 | |
from cryptography.hazmat.backends import default_backend | |
from cryptography.hazmat.primitives import serialization | |
from cryptography.hazmat.primitives.asymmetric import ec | |
from cryptography.hazmat.primitives.serialization.pkcs12 import \ | |
load_key_and_certificates | |
LOG = logging.getLogger(__name__) | |
SWG_ACCESS_ENDPOINT = '/v1/swg_access' | |
TEST_ADMIN_PASSWORD = os.environ.get('TEST_ADMIN_PASSWORD', 'MtXREnR4') | |
IS_SUDO = os.geteuid() == 0 | |
DEFAULT_STUNNEL_PORT = 22022 | |
STUNNEL_CONFIG_DIR = '/etc/stunnel' | |
KEY_FILE = 'key.pem' | |
CERT_FILE = 'cert.pem' | |
CHAIN_FILE = 'chain.pem' | |
ROOT_CA_FILE = 'proofpoint_root_ca.crt' | |
SQUID_CONF_FILE = 'squid-client.conf' | |
PAC_FILE = 'squidward/etc/pac_file.pac' | |
OUTPUT_DIR = NamedTemporaryFile(prefix='stunnel-config-').name | |
OUTPUT_CERT_PEM_FILE = os.path.join(OUTPUT_DIR, CERT_FILE) | |
OUTPUT_CHAIN_PEM_FILE = os.path.join(OUTPUT_DIR, CHAIN_FILE) | |
OUTPUT_KEY_PEM_FILE = os.path.join(OUTPUT_DIR, KEY_FILE) | |
PROOFPOINT_CA_PEM_FILE = os.path.join(OUTPUT_DIR, ROOT_CA_FILE) | |
OUTPUT_STUNNEL_CONF_FILE = os.path.join(OUTPUT_DIR, SQUID_CONF_FILE) | |
STUNNEL_CONFIG_FILE_TEMPLATE = """ | |
pid = /run/stunnel-squid.pid | |
debug = 6 | |
foreground = yes | |
[squid-client] | |
client = yes | |
accept = 127.0.0.1:{stunnel_port} | |
connect = {swg_zone}-geo.swg.metanetworks.{hostname_suffix}:3128 | |
verifyChain = yes | |
key = {stunnel_config_dir}/{key_file} | |
cert = {stunnel_config_dir}/{cert_file} | |
CAfile = {stunnel_config_dir}/{chain_file} | |
checkHost = {swg_zone}-geo.swg.metanetworks.{hostname_suffix} | |
""" | |
STUNNEL_CONFIGURATION_TEMPLATE = \ | |
f"Configuration files stored in {OUTPUT_DIR}" if IS_SUDO else \ | |
f"""- Copy configuration file to stunnel config path: | |
$ sudo cp -v {OUTPUT_DIR}/* {STUNNEL_CONFIG_DIR}""" | |
INSTRUCTIONS_TEMPLATE = f""" | |
Stunnel configuration files ready! | |
================================== | |
Stunnel configuration | |
~~~~~~~~~~~~~~~~~~~~~ | |
{STUNNEL_CONFIGURATION_TEMPLATE} | |
Running Stunnel | |
~~~~~~~~~~~~~~~ | |
- As daemon: | |
$ sudo /etc/init.d/stunnel4 restart | |
To show logs: | |
$ tail -f /var/log/stunnel4/stunnel.log | |
- Standalone: | |
$ sudo /etc/init.d/stunnel4 stop \\ | |
&& sudo stunnel4 {STUNNEL_CONFIG_DIR}/{SQUID_CONF_FILE} | |
Logs will be dumped to console | |
Configuring browser | |
~~~~~~~~~~~~~~~~~~~ | |
1. Load Proofpoint's root CA certificate in your browser: \ | |
{PROOFPOINT_CA_PEM_FILE} | |
2. Configure proxy | |
Choose proxy usage pattern: | |
- Automatic browser configuration (some traffic will be direct) | |
Setup your web browser to use PAC file from {PAC_FILE} | |
Note: make sure to update the PAC to point to your zone | |
- Manual browser configuration (all traffic goes through proxy) | |
Either in your browser or in system settings, set proxy server to: | |
http://localhost:{{stunnel_port}} | |
3. Optional: start Chrome browser in dev mode: | |
$ google-chrome --proxy-server="http://localhost:{{stunnel_port}}" \ | |
--enable-logging --v=1 | |
""" | |
class AppError(Exception): | |
pass | |
def check_sudo_requirements(require_sudo): | |
if not require_sudo: | |
return | |
if not IS_SUDO: | |
raise AppError("You need to have root privileges to run this script." | |
"Please try again, this time using 'sudo'.") | |
required_env_vars = [ | |
'HTTPIE_NSOF_PASSWORD', | |
'HTTPIE_NSOF_USERNAME', | |
'HTTPIE_NSOF_ORG' | |
] | |
for env_var in required_env_vars: | |
assert env_var in os.environ, f"Missing required env var: {env_var}" | |
def check_requirements(require_sudo): | |
try: | |
check_sudo_requirements(require_sudo=require_sudo) | |
required_files = [ | |
'/etc/init.d/stunnel4', | |
'/usr/bin/stunnel4' | |
] | |
for f in required_files: | |
assert os.path.isfile(f), f"Missing required file: {f}" | |
required_dirs = [ | |
STUNNEL_CONFIG_DIR | |
] | |
for d in required_dirs: | |
assert os.path.isdir(d), f"Missing required dir: {d}" | |
except AssertionError as e: | |
raise AppError(e) from e | |
def get_platform(): | |
platforms = dict(linux='Linux', linux2='Linux', darwin='macOS', | |
win32='Windows') | |
return platforms[sys.platform] | |
def generate_memorable_password(): | |
CONSONANT = 'bcdfghjklmnpqrstvwxyz' | |
VOWEL = 'aeiou' | |
parts = [] | |
for _ in range(8): | |
part = [random.choice(CONSONANT if (i % 2) == 0 else VOWEL) | |
for i in range(4)] | |
parts.append("".join(part)) | |
return "-".join(parts) | |
def get_hostname_suffix(zone): | |
return 'com' if zone in ['p', 'stg1'] else 'me' | |
def api_call(zone, endpoint, **kwargs): | |
api_prefix = '' if zone == 'p' else f'{zone}-' | |
hostname_suffix = get_hostname_suffix(zone=zone) | |
hostname = f'https://{api_prefix}api.metanetworks.{hostname_suffix}' | |
url = f'{hostname}{endpoint}' | |
cmd = f'http --auth-type=nsof POST {url}' | |
LOG.debug("API call: %s", cmd) | |
for k, v in kwargs.items(): | |
cmd += f' {k}={v}' | |
output = subprocess.check_output(cmd, shell=True) | |
return output | |
def get_credentials(zone): | |
platform = get_platform() | |
access_id_base = generate_memorable_password() | |
access_id_base_base64 = base64.b64encode( | |
access_id_base.encode('ascii')).decode('ascii') | |
try: | |
output = api_call(zone, SWG_ACCESS_ENDPOINT, platform=platform, | |
access_id_base=access_id_base_base64).decode() | |
except subprocess.CalledProcessError as e: | |
raise AppError(e) from e | |
try: | |
access_response = json.loads(output) | |
except ValueError as e: | |
LOG.debug(output) | |
raise AppError(f"Failed getting access info: {e}") from e | |
assert 'p12' in access_response | |
assert 'password' in access_response | |
assert 'access_id' in access_response | |
p12 = base64.b64decode(access_response['p12'].encode('ascii')) | |
key, certificate, more_certs = load_key_and_certificates( | |
data=p12, password=access_response['password'].encode(), | |
backend=default_backend()) | |
assert isinstance(key, ec.EllipticCurvePrivateKey) | |
assert isinstance(certificate, x509.Certificate) | |
assert isinstance(more_certs, list) | |
for cert in more_certs: | |
assert isinstance(cert, x509.Certificate) | |
return key, certificate, more_certs | |
def configure_stunnel(zone, key, certificate, more_certs, stunnel_port): | |
try: | |
os.makedirs(OUTPUT_DIR) | |
except FileExistsError: | |
pass | |
with open(OUTPUT_CERT_PEM_FILE, 'w') as f: | |
pem = certificate.public_bytes(encoding=serialization.Encoding.PEM) | |
f.write(pem.decode()) | |
with open(OUTPUT_CHAIN_PEM_FILE, 'w') as f: | |
for cert in more_certs: | |
pem = cert.public_bytes(encoding=serialization.Encoding.PEM) | |
f.write(pem.decode()) | |
with open(OUTPUT_KEY_PEM_FILE, 'w') as f: | |
pem = key.private_bytes( | |
encoding=serialization.Encoding.PEM, | |
format=serialization.PrivateFormat.TraditionalOpenSSL, | |
encryption_algorithm=serialization.NoEncryption()) | |
f.write(pem.decode()) | |
os.chmod(OUTPUT_KEY_PEM_FILE, 0o600) | |
with open(OUTPUT_STUNNEL_CONF_FILE, 'w') as f: | |
swg_zone = f'swg{zone}' | |
hostname_suffix = get_hostname_suffix(zone=zone) | |
data = STUNNEL_CONFIG_FILE_TEMPLATE.format( | |
stunnel_port=stunnel_port, swg_zone=swg_zone, | |
hostname_suffix=hostname_suffix, | |
stunnel_config_dir=STUNNEL_CONFIG_DIR, key_file=KEY_FILE, | |
cert_file=CERT_FILE, chain_file=CHAIN_FILE) | |
f.write(data) | |
with open(PROOFPOINT_CA_PEM_FILE, 'w') as f: | |
root_ca_cert = more_certs[-1] | |
pem = root_ca_cert.public_bytes(encoding=serialization.Encoding.PEM) | |
f.write(pem.decode()) | |
def restart_stunnel(): | |
cmd = '/etc/init.d/stunnel4 restart' | |
if IS_SUDO: | |
subprocess.check_output(cmd, shell=True) | |
def show_instructions(stunnel_port): | |
instructions = INSTRUCTIONS_TEMPLATE.format(stunnel_port=stunnel_port) | |
LOG.info(instructions) | |
def parse_args(): | |
parser = argparse.ArgumentParser(description='SWG setup script') | |
parser.add_argument('-z', '--zone', | |
required=True, | |
help='Zone to use w/o swg, e.g. dev') | |
parser.add_argument('-p', '--stunnel-port', | |
type=int, default=DEFAULT_STUNNEL_PORT, | |
help=f'Stunnel port. default={DEFAULT_STUNNEL_PORT}') | |
parser.add_argument('--require-sudo', | |
help='Enforce running as sudo') | |
parser.add_argument('-q', '--quiet', | |
action='store_true', | |
help='Reduce verbosity') | |
return parser.parse_args() | |
def main(): | |
args = parse_args() | |
logging.basicConfig( | |
format='%(asctime)s [%(levelname)s] %(message)s', | |
level=logging.INFO if args.quiet else logging.DEBUG, | |
) | |
try: | |
check_requirements(require_sudo=args.require_sudo) | |
key, certificate, more_certs = get_credentials(zone=args.zone) | |
configure_stunnel(zone=args.zone, key=key, certificate=certificate, | |
more_certs=more_certs, | |
stunnel_port=args.stunnel_port) | |
restart_stunnel() | |
show_instructions(stunnel_port=args.stunnel_port) | |
except AppError as e: | |
LOG.error("Application failure: %s", e) | |
sys.exit(1) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment