Skip to content

Instantly share code, notes, and snippets.

@pzagalsky
Forked from jkereako/config.yaml
Last active June 18, 2020 08:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pzagalsky/e4c48518c65187447c7679201f0c915f to your computer and use it in GitHub Desktop.
Save pzagalsky/e4c48518c65187447c7679201f0c915f to your computer and use it in GitHub Desktop.
Startup script for MITM Proxy.
scripts: [~/.mitmproxy/scripts/toggle_macos_proxies.py, ~/.mitmproxy/scripts/error_response.py]
view_filter: "~dst amazon.com|google.com"
console_mouse: false
console_palette: dark
console_eventlog_verbosity: error
import json
import time
from urllib.parse import urlparse, parse_qsl
class ErrorResponse(object):
"""
Stub a response from the server.
"""
def __init__(self):
self.response_delay = 0
self.status_code = 0
self.error_code = ''
self.error_response = {'errorStatus': {'code': '', 'message': 'Stubbed response from MITM Proxy'}}
def request(self, flow):
query_pairs = self.parse_query_string(flow)
for query_pair in query_pairs:
param = query_pair[0]
arg = query_pair[1]
# Error Code
elif param == 'code' or param == 'c':
self.error_code = str(arg)
# Checks for a value in the parameter, attempts to cast it to an int and,
# if the cast fails, then reset the instance properties to their default
# values.
# Response Delay
if param == 'delay' or param == 'd':
try:
self.response_delay = int(arg)
except ValueError:
self.response_delay = 0
# Status Code
elif param == 'status' or param == 's':
try:
self.status_code = int(arg)
except ValueError:
self.status_code = 0
def responseheaders(self, flow):
flow.response.headers["X-Proxy"] = "Intercepted by MITM Proxy"
if self.status_code == 0:
return
flow.response.status_code = self.status_code
def response(self, flow):
# Delay the response by the specified amount of time
if self.response_delay > 0:
time.sleep(self.response_delay)
if self.error_code == '':
return
self.error_response['errorStatus']['code'] = self.error_code
flow.response.headers['Content-Type'] = 'application/json'
flow.response.content = json.dumps(self.error_response).encode()
def parse_query_string(self, flow):
url = urlparse(flow.request.path)
return parse_qsl(url.query)
# Load
addons = [ErrorResponse()]
import subprocess
class NetworkDeviceInspector(object):
'''
Inspects network devices in macOS
'''
def active_device_name(self):
'''
Finds the active device name, e.g. "Wi-Fi", "Thunderbolt Ethernet"
'''
active_device = self.__find_active_device_identifier()
assert active_device is not None, "Unable to detect the active network device"
return self.__find_active_device_name(active_device)
def __find_active_device_identifier(self):
'''
Finds the active network device identifier (e.g. "en0" or "en1")
'''
# This is the simplest way to determine the active network device.
args = ["route", "get", "draftkings.com"]
lines = Helper().execute(args)
substring = "interface"
for line in lines:
string = str(line)
if substring in string:
# output: b' interface: en0'
#
# Splits the string on the colon to remove the first half, index
# the last element and then remove unwanted string
return string.split(":")[-1].strip("\"' ")
def __find_active_device_name(self, active_device):
'''
Finds the network service name for a given device identifier
'''
args = ["networksetup", "-listnetworkserviceorder"]
lines = Helper().execute(args)
for line in lines:
string = str(line)
if active_device in string:
# output: b'(Hardware Port: Wi-Fi, Device: en0)'
name = string.split(",")[0].split(":")[-1].strip("\"' ")
return name
# Unexpected behavior
assert False, "Unable find active network device name"
class ProxyManager(object):
'''
Inspects and manipulates proxies in macOS
'''
def __init__(self, device_name):
self.device_name = device_name
self.server = "localhost"
self.port = "8080"
def are_proxies_already_set(self):
'''
Sets both the HTTP and HTTPS web proxies for the argument
'self.device_name'.
'''
helper = Helper()
get_web_proxy_args = ["-getwebproxy", "-getsecurewebproxy"]
for get_web_proxy_arg in get_web_proxy_args:
args = ["networksetup", get_web_proxy_arg, self.device_name]
lines = helper.execute(args)
for line in lines:
split_line = line.strip("\"' ").split(":")
if split_line[0] == "Server":
if split_line[-1] != self.server:
return False
elif split_line[0] == "Port":
if split_line[-1] != self.port:
return False
return True
def set_proxies(self):
'''
Sets both the HTTP and HTTPS web proxies for the argument
'self.device_name'.
'''
helper = Helper()
set_web_proxy_args = ["-setwebproxy", "-setsecurewebproxy"]
for set_web_proxy_arg in set_web_proxy_args:
args = ["networksetup", set_web_proxy_arg, self.device_name, "localhost", "8080"]
helper.execute(args)
def deactivate_proxies(self):
'''
Deactivates both the HTTP and HTTPS web proxies for the argument
'self.device_name'.
'''
self.__toggle_web_proxies(self.device_name, "off")
def activate_proxies(self):
'''
Activates both the HTTP and HTTPS web proxies for the argument
'self.device_name'.
'''
self.__toggle_web_proxies(self.device_name, "on")
def __toggle_web_proxies(self, service_name, state="on"):
'''
Toggles both the HTTP and HTTPS web proxies for the argument
'service_name'.
'''
helper = Helper()
set_web_proxy_state_args = ["-setwebproxystate", "-setsecurewebproxystate"]
for set_web_proxy_state_arg in set_web_proxy_state_args:
args = ["networksetup", set_web_proxy_state_arg, service_name, state]
helper.execute(args)
class Helper(object):
'''
Internal helper methods.
'''
def execute(self, arg_list):
print(arg_list)
# Use this line instead to see the actual reported error
# result = subprocess.run(arg_list, stdout=subprocess.PIPE).stdout
result = subprocess.check_output(arg_list)
return result.decode("utf-8").splitlines()
#-- MITM events
class ProxyToggler(object):
def load(self, l):
device_inspector = NetworkDeviceInspector()
active_device = device_inspector.active_device_name()
proxy_manager = ProxyManager(active_device)
if not proxy_manager.are_proxies_already_set():
proxy_manager.set_proxies()
proxy_manager.activate_proxies()
def done(self):
device_inspector = NetworkDeviceInspector()
active_device = device_inspector.active_device_name()
proxy_manager = ProxyManager(active_device)
proxy_manager.deactivate_proxies()
addons = [
ProxyToggler()
]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment