mitmproxy invalid peer certificate override hook (WIP)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from mitmproxy import tls, ctx | |
from mitmproxy.net import tls as net_tls | |
from mitmproxy.addons.tlsconfig import TlsConfig | |
from OpenSSL import SSL | |
from typing import Optional | |
import json, os.path, functools | |
class UserTlsConfig(TlsConfig): | |
__tlsconfig = {} | |
__tlsconfig_modified = -1 | |
def __reload_tlsconfig(self): | |
if 'confdir' not in ctx.options: | |
ctx.log.debug("No 'confdir' option set up. I don't know where to look for the 'tlsconfig.json' file.") | |
return | |
tlsconfig_file = os.path.join(ctx.options.confdir, 'tlsconfig.json') | |
tlsconfig_file = os.path.realpath(tlsconfig_file) | |
if not os.path.isfile(tlsconfig_file): | |
ctx.log.debug("File not found: " + tlsconfig_file) | |
return | |
tlsconfig_mtime = os.path.getmtime(tlsconfig_file) | |
if tlsconfig_mtime == self.__tlsconfig_modified: | |
return | |
self.__tlsconfig_modified = tlsconfig_mtime | |
ctx.log.debug('Reloading: ' + tlsconfig_file) | |
try: | |
with open(tlsconfig_file) as f: | |
self.__tlsconfig = json.load(f) | |
except: | |
ctx.log.warn('Parsing failed: ' + tlsconfig_file) | |
return | |
self.__fill_in_tlsconfig_defaults() | |
# Reset cached SSL connection context to reset its options | |
net_tls.create_proxy_server_context.cache_clear() | |
def __fill_in_tlsconfig_defaults(self): | |
for address in self.__tlsconfig: | |
entry = self.__tlsconfig[address] | |
if 'verify' not in entry: | |
entry['verify'] = True | |
if 'sslv3' not in entry: | |
entry['sslv3'] = False | |
if 'tlsv1' not in entry: | |
entry['tlsv1'] = False | |
if 'tlsv11' not in entry: | |
self.__tlsconfig[address]['tlsv11'] = True | |
if 'tlsv12' not in entry: | |
self.__tlsconfig[address]['tlsv12'] = True | |
if 'tlsv13' not in entry: | |
self.__tlsconfig[address]['tlsv13'] = True | |
entry['min_proto'] = self.__extract_min_proto_version(entry) | |
entry['max_proto'] = self.__extract_max_proto_version(entry) | |
entry['options'] = self.__extract_options(entry) | |
def __extract_min_proto_version(self, entry) -> Optional[int]: | |
if entry['sslv3'] == True: | |
return SSL.SSL3_VERSION | |
if entry['tlsv1'] == True: | |
return SSL.TLS1_VERSION | |
if entry['tlsv11'] == True: | |
return SSL.TLS1_1_VERSION | |
if entry['tlsv12'] == True: | |
return SSL.TLS1_2_VERSION | |
if entry['tlsv13'] == True: | |
return SSL.TLS1_3_VERSION | |
def __extract_max_proto_version(self, entry) -> Optional[int]: | |
if entry['tlsv13'] == True: | |
return SSL.TLS1_3_VERSION | |
if entry['tlsv12'] == True: | |
return SSL.TLS1_2_VERSION | |
if entry['tlsv11'] == True: | |
return SSL.TLS1_1_VERSION | |
if entry['tlsv1'] == True: | |
return SSL.TLS1_VERSION | |
if entry['sslv3'] == True: | |
return SSL.SSL3_VERSION | |
def __extract_options(self, entry) -> int: | |
result = 0 | |
if entry['sslv3'] == False: | |
result |= SSL.OP_NO_SSLv3 | |
if entry['tlsv1'] == False: | |
result |= SSL.OP_NO_TLSv1 | |
if entry['tlsv11'] == False: | |
result |= SSL.OP_NO_TLSv1_1 | |
if entry['tlsv12'] == False: | |
result |= SSL.OP_NO_TLSv1_2 | |
if entry['tlsv13'] == False: | |
result |= SSL.OP_NO_TLSv1_3 | |
return result | |
def tls_start_server(self, tls_start: tls.TlsData): | |
# Doing this before super() call to be able to clear the create_proxy_server_context cache | |
self.__reload_tlsconfig() | |
address = tls_start.context.server.address[0] + ":" + str(tls_start.context.server.address[1]) | |
if address not in self.__tlsconfig: | |
return | |
super().tls_start_server(tls_start) | |
entry = self.__tlsconfig[address] | |
ctx.log.info('Overriding TLS config: ' + address + ' => ' + str(entry)) | |
ssl_ctx: SSL.Context = tls_start.ssl_conn.get_context() | |
if entry['verify'] == False: | |
ssl_ctx.set_verify(SSL.VERIFY_NONE, None) | |
if entry['min_proto'] != None: | |
ssl_ctx.set_min_proto_version(entry['min_proto']) | |
if entry['max_proto'] != None: | |
ssl_ctx.set_max_proto_version(entry['max_proto']) | |
ssl_ctx.set_options(entry['options']) | |
tls_start.ssl_conn = SSL.Connection(ssl_ctx) | |
tls_start.ssl_conn.set_tlsext_host_name(tls_start.conn.sni.encode()) | |
tls_start.ssl_conn.set_connect_state() | |
addons = [UserTlsConfig()] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"selfsigned.biz:443": { | |
"verify": false | |
}, | |
"oldiesparty.com:11005": { | |
"verify": false, | |
"sslv3": true, | |
"tlsv1": true | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment