Skip to content

Instantly share code, notes, and snippets.

@tomers
Created August 24, 2020 11:38
Show Gist options
  • Save tomers/a5cd0dd6efcf0fe786cbdbbc4bd0dc84 to your computer and use it in GitHub Desktop.
Save tomers/a5cd0dd6efcf0fe786cbdbbc4bd0dc84 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import argparse
import base64
import json
import logging
import os
import random
import subprocess
import sys
from tempfile import NamedTemporaryFile
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.serialization.pkcs12 import \
load_key_and_certificates
LOG = logging.getLogger(__name__)
SWG_ACCESS_ENDPOINT = '/v1/swg_access'
TEST_ADMIN_PASSWORD = os.environ.get('TEST_ADMIN_PASSWORD', 'MtXREnR4')
IS_SUDO = os.geteuid() == 0
DEFAULT_STUNNEL_PORT = 22022
STUNNEL_CONFIG_DIR = '/etc/stunnel'
KEY_FILE = 'key.pem'
CERT_FILE = 'cert.pem'
CHAIN_FILE = 'chain.pem'
ROOT_CA_FILE = 'proofpoint_root_ca.crt'
SQUID_CONF_FILE = 'squid-client.conf'
PAC_FILE = 'squidward/etc/pac_file.pac'
OUTPUT_DIR = NamedTemporaryFile(prefix='stunnel-config-').name
OUTPUT_CERT_PEM_FILE = os.path.join(OUTPUT_DIR, CERT_FILE)
OUTPUT_CHAIN_PEM_FILE = os.path.join(OUTPUT_DIR, CHAIN_FILE)
OUTPUT_KEY_PEM_FILE = os.path.join(OUTPUT_DIR, KEY_FILE)
PROOFPOINT_CA_PEM_FILE = os.path.join(OUTPUT_DIR, ROOT_CA_FILE)
OUTPUT_STUNNEL_CONF_FILE = os.path.join(OUTPUT_DIR, SQUID_CONF_FILE)
STUNNEL_CONFIG_FILE_TEMPLATE = """
pid = /run/stunnel-squid.pid
debug = 6
foreground = yes
[squid-client]
client = yes
accept = 127.0.0.1:{stunnel_port}
connect = {swg_zone}-geo.swg.metanetworks.{hostname_suffix}:3128
verifyChain = yes
key = {stunnel_config_dir}/{key_file}
cert = {stunnel_config_dir}/{cert_file}
CAfile = {stunnel_config_dir}/{chain_file}
checkHost = {swg_zone}-geo.swg.metanetworks.{hostname_suffix}
"""
STUNNEL_CONFIGURATION_TEMPLATE = \
f"Configuration files stored in {OUTPUT_DIR}" if IS_SUDO else \
f"""- Copy configuration file to stunnel config path:
$ sudo cp -v {OUTPUT_DIR}/* {STUNNEL_CONFIG_DIR}"""
INSTRUCTIONS_TEMPLATE = f"""
Stunnel configuration files ready!
==================================
Stunnel configuration
~~~~~~~~~~~~~~~~~~~~~
{STUNNEL_CONFIGURATION_TEMPLATE}
Running Stunnel
~~~~~~~~~~~~~~~
- As daemon:
$ sudo /etc/init.d/stunnel4 restart
To show logs:
$ tail -f /var/log/stunnel4/stunnel.log
- Standalone:
$ sudo /etc/init.d/stunnel4 stop \\
&& sudo stunnel4 {STUNNEL_CONFIG_DIR}/{SQUID_CONF_FILE}
Logs will be dumped to console
Configuring browser
~~~~~~~~~~~~~~~~~~~
1. Load Proofpoint's root CA certificate in your browser: \
{PROOFPOINT_CA_PEM_FILE}
2. Configure proxy
Choose proxy usage pattern:
- Automatic browser configuration (some traffic will be direct)
Setup your web browser to use PAC file from {PAC_FILE}
Note: make sure to update the PAC to point to your zone
- Manual browser configuration (all traffic goes through proxy)
Either in your browser or in system settings, set proxy server to:
http://localhost:{{stunnel_port}}
3. Optional: start Chrome browser in dev mode:
$ google-chrome --proxy-server="http://localhost:{{stunnel_port}}" \
--enable-logging --v=1
"""
class AppError(Exception):
pass
def check_sudo_requirements(require_sudo):
if not require_sudo:
return
if not IS_SUDO:
raise AppError("You need to have root privileges to run this script."
"Please try again, this time using 'sudo'.")
required_env_vars = [
'HTTPIE_NSOF_PASSWORD',
'HTTPIE_NSOF_USERNAME',
'HTTPIE_NSOF_ORG'
]
for env_var in required_env_vars:
assert env_var in os.environ, f"Missing required env var: {env_var}"
def check_requirements(require_sudo):
try:
check_sudo_requirements(require_sudo=require_sudo)
required_files = [
'/etc/init.d/stunnel4',
'/usr/bin/stunnel4'
]
for f in required_files:
assert os.path.isfile(f), f"Missing required file: {f}"
required_dirs = [
STUNNEL_CONFIG_DIR
]
for d in required_dirs:
assert os.path.isdir(d), f"Missing required dir: {d}"
except AssertionError as e:
raise AppError(e) from e
def get_platform():
platforms = dict(linux='Linux', linux2='Linux', darwin='macOS',
win32='Windows')
return platforms[sys.platform]
def generate_memorable_password():
CONSONANT = 'bcdfghjklmnpqrstvwxyz'
VOWEL = 'aeiou'
parts = []
for _ in range(8):
part = [random.choice(CONSONANT if (i % 2) == 0 else VOWEL)
for i in range(4)]
parts.append("".join(part))
return "-".join(parts)
def get_hostname_suffix(zone):
return 'com' if zone in ['p', 'stg1'] else 'me'
def api_call(zone, endpoint, **kwargs):
api_prefix = '' if zone == 'p' else f'{zone}-'
hostname_suffix = get_hostname_suffix(zone=zone)
hostname = f'https://{api_prefix}api.metanetworks.{hostname_suffix}'
url = f'{hostname}{endpoint}'
cmd = f'http --auth-type=nsof POST {url}'
LOG.debug("API call: %s", cmd)
for k, v in kwargs.items():
cmd += f' {k}={v}'
output = subprocess.check_output(cmd, shell=True)
return output
def get_credentials(zone):
platform = get_platform()
access_id_base = generate_memorable_password()
access_id_base_base64 = base64.b64encode(
access_id_base.encode('ascii')).decode('ascii')
try:
output = api_call(zone, SWG_ACCESS_ENDPOINT, platform=platform,
access_id_base=access_id_base_base64).decode()
except subprocess.CalledProcessError as e:
raise AppError(e) from e
try:
access_response = json.loads(output)
except ValueError as e:
LOG.debug(output)
raise AppError(f"Failed getting access info: {e}") from e
assert 'p12' in access_response
assert 'password' in access_response
assert 'access_id' in access_response
p12 = base64.b64decode(access_response['p12'].encode('ascii'))
key, certificate, more_certs = load_key_and_certificates(
data=p12, password=access_response['password'].encode(),
backend=default_backend())
assert isinstance(key, ec.EllipticCurvePrivateKey)
assert isinstance(certificate, x509.Certificate)
assert isinstance(more_certs, list)
for cert in more_certs:
assert isinstance(cert, x509.Certificate)
return key, certificate, more_certs
def configure_stunnel(zone, key, certificate, more_certs, stunnel_port):
try:
os.makedirs(OUTPUT_DIR)
except FileExistsError:
pass
with open(OUTPUT_CERT_PEM_FILE, 'w') as f:
pem = certificate.public_bytes(encoding=serialization.Encoding.PEM)
f.write(pem.decode())
with open(OUTPUT_CHAIN_PEM_FILE, 'w') as f:
for cert in more_certs:
pem = cert.public_bytes(encoding=serialization.Encoding.PEM)
f.write(pem.decode())
with open(OUTPUT_KEY_PEM_FILE, 'w') as f:
pem = key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption())
f.write(pem.decode())
os.chmod(OUTPUT_KEY_PEM_FILE, 0o600)
with open(OUTPUT_STUNNEL_CONF_FILE, 'w') as f:
swg_zone = f'swg{zone}'
hostname_suffix = get_hostname_suffix(zone=zone)
data = STUNNEL_CONFIG_FILE_TEMPLATE.format(
stunnel_port=stunnel_port, swg_zone=swg_zone,
hostname_suffix=hostname_suffix,
stunnel_config_dir=STUNNEL_CONFIG_DIR, key_file=KEY_FILE,
cert_file=CERT_FILE, chain_file=CHAIN_FILE)
f.write(data)
with open(PROOFPOINT_CA_PEM_FILE, 'w') as f:
root_ca_cert = more_certs[-1]
pem = root_ca_cert.public_bytes(encoding=serialization.Encoding.PEM)
f.write(pem.decode())
def restart_stunnel():
cmd = '/etc/init.d/stunnel4 restart'
if IS_SUDO:
subprocess.check_output(cmd, shell=True)
def show_instructions(stunnel_port):
instructions = INSTRUCTIONS_TEMPLATE.format(stunnel_port=stunnel_port)
LOG.info(instructions)
def parse_args():
parser = argparse.ArgumentParser(description='SWG setup script')
parser.add_argument('-z', '--zone',
required=True,
help='Zone to use w/o swg, e.g. dev')
parser.add_argument('-p', '--stunnel-port',
type=int, default=DEFAULT_STUNNEL_PORT,
help=f'Stunnel port. default={DEFAULT_STUNNEL_PORT}')
parser.add_argument('--require-sudo',
help='Enforce running as sudo')
parser.add_argument('-q', '--quiet',
action='store_true',
help='Reduce verbosity')
return parser.parse_args()
def main():
args = parse_args()
logging.basicConfig(
format='%(asctime)s [%(levelname)s] %(message)s',
level=logging.INFO if args.quiet else logging.DEBUG,
)
try:
check_requirements(require_sudo=args.require_sudo)
key, certificate, more_certs = get_credentials(zone=args.zone)
configure_stunnel(zone=args.zone, key=key, certificate=certificate,
more_certs=more_certs,
stunnel_port=args.stunnel_port)
restart_stunnel()
show_instructions(stunnel_port=args.stunnel_port)
except AppError as e:
LOG.error("Application failure: %s", e)
sys.exit(1)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment