-
-
Save pzagalsky/e4c48518c65187447c7679201f0c915f to your computer and use it in GitHub Desktop.
Startup script for MITM Proxy.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
scripts: [~/.mitmproxy/scripts/toggle_macos_proxies.py, ~/.mitmproxy/scripts/error_response.py] | |
view_filter: "~dst amazon.com|google.com" | |
console_mouse: false | |
console_palette: dark | |
console_eventlog_verbosity: error |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import time | |
from urllib.parse import urlparse, parse_qsl | |
class ErrorResponse(object): | |
""" | |
Stub a response from the server. | |
""" | |
def __init__(self): | |
self.response_delay = 0 | |
self.status_code = 0 | |
self.error_code = '' | |
self.error_response = {'errorStatus': {'code': '', 'message': 'Stubbed response from MITM Proxy'}} | |
def request(self, flow): | |
query_pairs = self.parse_query_string(flow) | |
for query_pair in query_pairs: | |
param = query_pair[0] | |
arg = query_pair[1] | |
# Error Code | |
elif param == 'code' or param == 'c': | |
self.error_code = str(arg) | |
# Checks for a value in the parameter, attempts to cast it to an int and, | |
# if the cast fails, then reset the instance properties to their default | |
# values. | |
# Response Delay | |
if param == 'delay' or param == 'd': | |
try: | |
self.response_delay = int(arg) | |
except ValueError: | |
self.response_delay = 0 | |
# Status Code | |
elif param == 'status' or param == 's': | |
try: | |
self.status_code = int(arg) | |
except ValueError: | |
self.status_code = 0 | |
def responseheaders(self, flow): | |
flow.response.headers["X-Proxy"] = "Intercepted by MITM Proxy" | |
if self.status_code == 0: | |
return | |
flow.response.status_code = self.status_code | |
def response(self, flow): | |
# Delay the response by the specified amount of time | |
if self.response_delay > 0: | |
time.sleep(self.response_delay) | |
if self.error_code == '': | |
return | |
self.error_response['errorStatus']['code'] = self.error_code | |
flow.response.headers['Content-Type'] = 'application/json' | |
flow.response.content = json.dumps(self.error_response).encode() | |
def parse_query_string(self, flow): | |
url = urlparse(flow.request.path) | |
return parse_qsl(url.query) | |
# Load | |
addons = [ErrorResponse()] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import subprocess | |
class NetworkDeviceInspector(object): | |
''' | |
Inspects network devices in macOS | |
''' | |
def active_device_name(self): | |
''' | |
Finds the active device name, e.g. "Wi-Fi", "Thunderbolt Ethernet" | |
''' | |
active_device = self.__find_active_device_identifier() | |
assert active_device is not None, "Unable to detect the active network device" | |
return self.__find_active_device_name(active_device) | |
def __find_active_device_identifier(self): | |
''' | |
Finds the active network device identifier (e.g. "en0" or "en1") | |
''' | |
# This is the simplest way to determine the active network device. | |
args = ["route", "get", "draftkings.com"] | |
lines = Helper().execute(args) | |
substring = "interface" | |
for line in lines: | |
string = str(line) | |
if substring in string: | |
# output: b' interface: en0' | |
# | |
# Splits the string on the colon to remove the first half, index | |
# the last element and then remove unwanted string | |
return string.split(":")[-1].strip("\"' ") | |
def __find_active_device_name(self, active_device): | |
''' | |
Finds the network service name for a given device identifier | |
''' | |
args = ["networksetup", "-listnetworkserviceorder"] | |
lines = Helper().execute(args) | |
for line in lines: | |
string = str(line) | |
if active_device in string: | |
# output: b'(Hardware Port: Wi-Fi, Device: en0)' | |
name = string.split(",")[0].split(":")[-1].strip("\"' ") | |
return name | |
# Unexpected behavior | |
assert False, "Unable find active network device name" | |
class ProxyManager(object): | |
''' | |
Inspects and manipulates proxies in macOS | |
''' | |
def __init__(self, device_name): | |
self.device_name = device_name | |
self.server = "localhost" | |
self.port = "8080" | |
def are_proxies_already_set(self): | |
''' | |
Sets both the HTTP and HTTPS web proxies for the argument | |
'self.device_name'. | |
''' | |
helper = Helper() | |
get_web_proxy_args = ["-getwebproxy", "-getsecurewebproxy"] | |
for get_web_proxy_arg in get_web_proxy_args: | |
args = ["networksetup", get_web_proxy_arg, self.device_name] | |
lines = helper.execute(args) | |
for line in lines: | |
split_line = line.strip("\"' ").split(":") | |
if split_line[0] == "Server": | |
if split_line[-1] != self.server: | |
return False | |
elif split_line[0] == "Port": | |
if split_line[-1] != self.port: | |
return False | |
return True | |
def set_proxies(self): | |
''' | |
Sets both the HTTP and HTTPS web proxies for the argument | |
'self.device_name'. | |
''' | |
helper = Helper() | |
set_web_proxy_args = ["-setwebproxy", "-setsecurewebproxy"] | |
for set_web_proxy_arg in set_web_proxy_args: | |
args = ["networksetup", set_web_proxy_arg, self.device_name, "localhost", "8080"] | |
helper.execute(args) | |
def deactivate_proxies(self): | |
''' | |
Deactivates both the HTTP and HTTPS web proxies for the argument | |
'self.device_name'. | |
''' | |
self.__toggle_web_proxies(self.device_name, "off") | |
def activate_proxies(self): | |
''' | |
Activates both the HTTP and HTTPS web proxies for the argument | |
'self.device_name'. | |
''' | |
self.__toggle_web_proxies(self.device_name, "on") | |
def __toggle_web_proxies(self, service_name, state="on"): | |
''' | |
Toggles both the HTTP and HTTPS web proxies for the argument | |
'service_name'. | |
''' | |
helper = Helper() | |
set_web_proxy_state_args = ["-setwebproxystate", "-setsecurewebproxystate"] | |
for set_web_proxy_state_arg in set_web_proxy_state_args: | |
args = ["networksetup", set_web_proxy_state_arg, service_name, state] | |
helper.execute(args) | |
class Helper(object): | |
''' | |
Internal helper methods. | |
''' | |
def execute(self, arg_list): | |
print(arg_list) | |
# Use this line instead to see the actual reported error | |
# result = subprocess.run(arg_list, stdout=subprocess.PIPE).stdout | |
result = subprocess.check_output(arg_list) | |
return result.decode("utf-8").splitlines() | |
#-- MITM events | |
class ProxyToggler(object): | |
def load(self, l): | |
device_inspector = NetworkDeviceInspector() | |
active_device = device_inspector.active_device_name() | |
proxy_manager = ProxyManager(active_device) | |
if not proxy_manager.are_proxies_already_set(): | |
proxy_manager.set_proxies() | |
proxy_manager.activate_proxies() | |
def done(self): | |
device_inspector = NetworkDeviceInspector() | |
active_device = device_inspector.active_device_name() | |
proxy_manager = ProxyManager(active_device) | |
proxy_manager.deactivate_proxies() | |
addons = [ | |
ProxyToggler() | |
] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment