Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
HTTPS Intercept Proxy
#!/usr/bin/env python2
#Author Phinfinity <rndanish@gmail.com>
"""
Simple hacked up https transparent proxy
works on desktops/laptops running linux
Add iptables rules (in order):
iptables -t nat -A OUTPUT -d 192.168.0.0/16,10.0.0.0/8,172.16.0.0/12,127.0.0.1 -j ACCEPT
iptables -t nat -A OUTPUT -p tcp --dport 443 -j DNAT --to 127.0.0.1:1234
"""
import logging
from logging import handlers
import threading
import os
import signal
from socket import *
import struct
from OpenSSL import SSL
SO_ORIGINAL_DST = 80
PROXY_HOST = "127.0.0.1"
PROXY_PORT = 8080
SERVER_HOST = "0.0.0.0"
SERVER_PORT = 1235
SERVER_CONN_BUFFER = 5
LOG_FILE = "/var/log/squid/intercept_https.log"
log_handler = handlers.WatchedFileHandler(LOG_FILE)
log_handler.setFormatter(logging.Formatter(
'%(asctime)s %(message)s',
'%b %d %H:%M:%S'))
logger = logging.getLogger("proxy")
logger.addHandler(log_handler)
logger.setLevel(logging.DEBUG)
#Adapted from https://gist.github.com/DonnchaC/4a89bf7c52500a1d7e7b
class SSLContext(object):
"""
Simple mocked SSL connection to allow parsing of the ClientHello
"""
def __init__(self):
"""
Initialize an SSL connection object
"""
self.server_name = None
context = SSL.Context(SSL.TLSv1_2_METHOD)
context.set_tlsext_servername_callback(self.get_servername)
self.connection = SSL.Connection(context=context)
self.connection.set_accept_state()
def get_servername(self, connection):
"""
Callback to retrieve the parsed SNI extension when it is parsed
"""
self.server_name = connection.get_servername()
def parse_client_hello(self, client_hello):
# Write the SSL handshake into the BIO memory stream.
self.connection.bio_write(client_hello)
try:
# Start parsing the client handshake from the memory stream
self.connection.do_handshake()
except SSL.Error:
# We don't have a complete SSL handshake, only the ClientHello,
# close the connection once we hit an error.
self.connection.shutdown()
# Should have run the get_servername callback already
return self.server_name
def parseSNI(data):
ssl_context = SSLContext()
return ssl_context.parse_client_hello(data)
def pipe_data(s_from, s_to):
try:
while True:
data = s_from.recv(2048)
if len(data) == 0:
return
s_to.send(data)
from_ip, from_port = s_from.getpeername()
sni_domain = parseSNI(data)
if sni_domain:
logger.info('%s:%d to SNI %s:443 identified', from_ip, from_port, sni_domain)
except error as e:
logger.warn("Error({0}): {1}, ActiveThreadCount: %d".format(e.errno, e.strerror), threading.active_count())
return
finally:
try:
s_from.shutdown(SHUT_RDWR)
s_from.close()
except error:
pass
try:
s_to.shutdown(SHUT_RDWR)
s_to.close()
except error:
pass
def wrap_https_proxy(proxy_s, s, dest_host, dest_port):
# logger.info("Wrapping HTTPS connection on proxy")
https_conn = "CONNECT {host}:{port} HTTP/1.1\r\nHost: {host}\r\n\r\n"
https_conn = https_conn.format(host=dest_host, port=dest_port)
proxy_s.send(https_conn)
data = proxy_s.recv(1024)
if not data.startswith("HTTP/1.1"):
logger.info("Wrapped connection failed")
logger.info("Got : %s", data.split('\n')[0])
proxy_s.close()
s.close()
# else:
# logger.info("Wrapped connection looks fine")
def handle_connection(s):
srv_hostname = None
try:
proxy_s = socket(AF_INET, SOCK_STREAM)
proxy_s.connect((PROXY_HOST, PROXY_PORT))
dst = s.getsockopt(SOL_IP, SO_ORIGINAL_DST, 16)
srv_port, srv_ip = struct.unpack("!2xH4s8x", dst)
src_ip, src_port = s.getpeername()
srv_host = inet_ntoa(srv_ip)
try:
srv_hostname, srv_aliaslist, srv_iplist = gethostbyaddr(srv_host)
except:
pass
dst_hostname_str = "rDNS " + srv_hostname if srv_hostname else "IP " + srv_host
logger.info("%s:%d to %s:%d intercepted", src_ip, src_port, dst_hostname_str, srv_port)
wrap_https_proxy(proxy_s, s, srv_host, srv_port)
t1 = threading.Thread(target=pipe_data, args=(s, proxy_s))
t2 = threading.Thread(target=pipe_data, args=(proxy_s, s))
t1.start()
t2.start()
t1.join()
t2.join()
logger.info("%s:%d to %s:%d terminated", src_ip, src_port, dst_hostname_str, srv_port)
finally:
try:
proxy_s.close()
except error:
pass
try:
s.close()
except error:
pass
if __name__ == "__main__":
pid = os.getpid()
s = socket(AF_INET, SOCK_STREAM)
s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
s.bind((SERVER_HOST, SERVER_PORT))
s.listen(SERVER_CONN_BUFFER)
logger.info("pid:%d , Listening on %d", pid, SERVER_PORT)
while True:
try:
conn, addr = s.accept()
threading.Thread(target=handle_connection, args=(conn,)).start()
except KeyboardInterrupt:
s.close()
os.kill(pid, signal.SIGKILL)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.