Last active
May 3, 2023 12:10
-
-
Save book000/ae5685c7903325f76ffefee115ca67e8 to your computer and use it in GitHub Desktop.
mitmproxyですべての通信をファイルに書き出すアドオン
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
from pprint import pprint | |
from mitmproxy import addonmanager, http | |
from mitmproxy import tls | |
from urllib.parse import urlparse | |
from mitmproxy import ctx | |
import os | |
from datetime import datetime | |
base_dir = os.environ.get("BASE_DIR", os.getcwd()) | |
def get_req_http_file_body(flow: http.HTTPFlow): | |
return '\n'.join([ | |
f'{flow.request.method} {flow.request.path} HTTP/1.1', | |
'\n'.join([f'{k}: {v}' for k, v in flow.request.headers.items()]), | |
'', | |
flow.request.content.decode('utf-8'), | |
]) | |
def get_res_http_file_body(flow: http.HTTPFlow): | |
return '\n'.join([ | |
f'HTTP/1.1 {flow.response.status_code} {flow.response.reason}', | |
'\n'.join([f'{k}: {v}' for k, v in flow.response.headers.items()]), | |
'', | |
flow.response.content.decode('utf-8'), | |
]) | |
def sanitize_filename(filename: str): | |
# windows invalid chars: \ / : * ? " < > | | |
filename = filename.replace('\\', '_').replace('/', '_').replace(':', '_').replace('*', '_').replace('?', '_').replace('"', '_').replace('<', '_').replace('>', '_').replace('|', '_') | |
# windows invalid filenames: CON PRN AUX NUL COM1 COM2 COM3 COM4 COM5 COM6 COM7 COM8 COM9 LPT1 LPT2 LPT3 LPT4 LPT5 LPT6 LPT7 LPT8 LPT9 | |
if filename.lower() in ['con', 'prn', 'aux', 'nul', 'com', 'lpt']: | |
filename = '_' + filename + '_' | |
return filename | |
def sanitize_path(path: str): | |
return os.path.join(*[sanitize_filename(p) for p in path.split('/')]) | |
def get_extension(res: http.Response): | |
content_type = res.headers.get('Content-Type') | |
extensions = { | |
'application/json': 'json', | |
'text/html': 'html', | |
'text/css': 'css', | |
'text/javascript': 'js', | |
'image/png': 'png', | |
'image/jpeg': 'jpg', | |
'image/gif': 'gif', | |
'image/svg+xml': 'svg', | |
'application/pdf': 'pdf', | |
'application/zip': 'zip', | |
'application/x-tar': 'tar', | |
'application/x-rar-compressed': 'rar', | |
'application/x-7z-compressed': '7z', | |
'application/x-bzip': 'bz', | |
'application/x-bzip2': 'bz2', | |
'application/x-xz': 'xz', | |
'application/x-msdownload': 'exe', | |
'application/x-shockwave-flash': 'swf', | |
'application/rtf': 'rtf', | |
'application/octet-stream': 'bin', | |
} | |
if content_type in extensions: | |
return extensions[content_type] | |
# jsonでデコードしてみて、エラーが出なければjsonとする | |
try: | |
res.json() | |
return 'json' | |
except: | |
pass | |
return 'txt' | |
def get_datetime(): | |
return datetime.now().strftime('%Y%m%d%H%M%S.%f') | |
def request(flow: http.HTTPFlow): | |
method = flow.request.method | |
url = flow.request.url | |
host = urlparse(url).netloc | |
req_path = urlparse(url).path | |
path = os.path.join(base_dir, 'req', host, sanitize_path(req_path.lstrip('/')), method.lower(), get_datetime() + '.http') | |
if len(path) > 255: | |
print('path too long: ' + path[0:100]) | |
return | |
os.makedirs(os.path.dirname(path), exist_ok=True) | |
with open(path, 'w', encoding="utf-8") as f: | |
f.write(get_req_http_file_body(flow)) | |
def response(flow: http.HTTPFlow): | |
method = flow.request.method | |
url = flow.request.url | |
host = urlparse(url).netloc | |
req_path = urlparse(url).path | |
result = flow.response.get_content() | |
extension = get_extension(flow.response) | |
if extension == 'json': | |
result = json.dumps(flow.response.json(), indent=2) | |
path = os.path.join(base_dir, 'res', host, sanitize_path(req_path.lstrip('/')), method.lower(), get_datetime() + '.' + extension) | |
if len(path) > 255: | |
print('path too long: ' + path[0:100]) | |
return | |
os.makedirs(os.path.dirname(path), exist_ok=True) | |
if type(result) == bytes: | |
with open(path, 'wb') as f: | |
f.write(result) | |
if type(result) == str: | |
with open(path, 'w', encoding="utf-8") as f: | |
f.write(result) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment