Last active
September 15, 2021 01:44
-
-
Save havron/a9e2fc93bc14877336643fcc4b53f0bc to your computer and use it in GitHub Desktop.
MITMProxy's default dump script is *not* Pandas friendly! Run `mitmdump -s MITM_to_csv.py -q` with this gist to get CSV for requests and responses. The 'id' is what uniquely links requests with their response pair -- I might eventually move to HAR style dump to join reqs and resps.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from mitmproxy import http | |
from mitmproxy.net.http import cookies | |
from mitmproxy.utils import strutils | |
from datetime import datetime | |
import csv | |
import base64 | |
import json | |
REQUESTS_LOG = 'requests.mitm.csv' | |
RESPONSES_LOG = 'responses.mitm.csv' | |
# took out 'text' | |
resp_flows = ['http_version', 'status_code', 'reason', | |
'body_size', 'body_decoded_size','mimeType', 'body_compression',\ | |
'content', 'timestamp_start', 'timestamp_end',\ | |
'cookies'] | |
req_flows = ['first_line_format', 'method', 'scheme', 'host', \ | |
'port', 'path', 'http_version', 'content',\ | |
'timestamp_start', 'timestamp_end', 'url',\ | |
'pretty_host', 'pretty_url', 'query', 'cookies', \ | |
'path_components', 'urlencoded_form', 'multipart_form'] | |
connection = ['id', 'address', 'port', 'headers'] | |
requests_buffer = [] | |
responses_buffer = [] | |
print("Initializing log files...") | |
with open(REQUESTS_LOG, 'w') as fh: | |
writer = csv.writer(fh) | |
writer.writerow(connection+req_flows) | |
with open(RESPONSES_LOG, 'w') as fh: | |
writer = csv.writer(fh) | |
writer.writerow(connection+resp_flows) | |
print("Logs initialized.") | |
def done(): | |
""" | |
Called once on script shutdown, after any other events. | |
""" | |
print("Saving HTTP/HTTPs requests and responses...") | |
if len(requests_buffer) > 0: | |
print("Saving last requests ({})..."\ | |
.format(len(requests_buffer))) | |
with open(REQUESTS_LOG, 'a') as fh: | |
writer = csv.writer(fh) | |
writer.writerows(requests_buffer) | |
if len(responses_buffer) > 0: | |
print("Saving last responses ({})..."\ | |
.format(len(responses_buffer))) | |
with open(RESPONSES_LOG, 'a') as fh: | |
writer = csv.writer(fh) | |
writer.writerows(responses_buffer) | |
print("Saved.") | |
def response(flow: http.HTTPFlow) -> None: | |
""" | |
Called when a server response has been received. | |
""" | |
global responses_buffer | |
response_body_size = len(flow.response.raw_content) | |
response_body_decoded_size = len(flow.response.content) | |
response_body_compression = response_body_decoded_size - response_body_size | |
headers = {} | |
for k,v in flow.response.headers.items(): | |
headers[k] = v | |
headers = json.dumps(headers) | |
content = "" | |
if strutils.is_mostly_bin(flow.response.content): | |
content = base64.b64encode(flow.response.content).decode('utf-8') | |
else: | |
content = flow.response.text | |
server = "" | |
port = "" | |
if flow.server_conn.address is not None: | |
server = flow.server_conn.address[0] | |
port = flow.server_conn.address[1] | |
response_entry = [id(flow), server,\ | |
port, headers,\ | |
flow.response.http_version,\ | |
flow.response.status_code, flow.response.reason,\ | |
response_body_size, response_body_decoded_size,\ | |
flow.response.headers.get('Content-Type', ''), response_body_compression,\ | |
content, datetime.fromtimestamp(flow.response.timestamp_start), \ | |
datetime.fromtimestamp(flow.response.timestamp_end),\ | |
json.dumps(format_response_cookies(flow.response.cookies.fields))] | |
responses_buffer.append(response_entry) | |
if len(responses_buffer) > 2: | |
print("Dumping last 3 responses...") | |
with open(RESPONSES_LOG, 'a') as fh: | |
writer = csv.writer(fh) | |
writer.writerows(responses_buffer) | |
del responses_buffer[:] | |
print("Dumped last 3 responses.") | |
def request(flow: http.HTTPFlow) -> None: | |
""" | |
Called when a server request has been received. | |
""" | |
global requests_buffer | |
headers = {} | |
for k,v in flow.request.headers.items(): | |
headers[k] = v | |
headers = json.dumps(headers) | |
urlforms = {} | |
for k,v in flow.request.urlencoded_form.items(): | |
urlforms[str(k)] = v | |
urlforms = json.dumps(urlforms) | |
multipart = {} | |
for k,v in flow.request.multipart_form.items(multi=True): | |
if strutils.is_mostly_bin(str(v)): | |
v = base64.b64encode(v).decode('utf-8') | |
multipart[str(k)] = str(v) | |
multipart = json.dumps(multipart) | |
query = {} | |
for k,v in flow.request.query.items(): | |
query[k] = v | |
query = json.dumps(query) | |
content = "" | |
if strutils.is_mostly_bin(flow.request.content): | |
content = base64.b64encode(flow.request.content).decode('utf-8') | |
else: | |
content = flow.request.text | |
server = "" | |
port = "" | |
if flow.server_conn.address is not None: | |
server = flow.server_conn.address[0] | |
port = flow.server_conn.address[1] | |
request_entry = [id(flow), server, \ | |
port, headers,\ | |
flow.request.first_line_format, \ | |
flow.request.method, flow.request.scheme,\ | |
flow.request.host, flow.request.port,\ | |
flow.request.path, flow.request.http_version, content,\ | |
datetime.fromtimestamp(flow.request.timestamp_start),\ | |
datetime.fromtimestamp(flow.request.timestamp_end),\ | |
flow.request.url, flow.request.pretty_host,\ | |
flow.request.pretty_url, query, | |
json.dumps(format_request_cookies(flow.request.cookies.fields)),\ | |
flow.request.path_components, \ | |
urlforms,multipart] | |
requests_buffer.append(request_entry) | |
if len(requests_buffer) > 2: | |
print("Dumping last 3 requests...") | |
with open(REQUESTS_LOG, 'a') as fh: | |
writer = csv.writer(fh) | |
writer.writerows(requests_buffer) | |
del requests_buffer[:] | |
print("Dumped last 3 requests.") | |
def format_cookies(cookie_list): | |
''' | |
cookie formatter from | |
https://github.com/mitmproxy/mitmproxy/blob/v4.0.4/examples/complex/har_dump.py | |
under MIT License. | |
''' | |
rv = [] | |
for name, value, attrs in cookie_list: | |
cookie_har = { | |
"name": name, | |
"value": value, | |
} | |
# HAR only needs some attributes | |
for key in ["path", "domain", "comment"]: | |
if key in attrs: | |
cookie_har[key] = attrs[key] | |
# These keys need to be boolean! | |
for key in ["httpOnly", "secure"]: | |
cookie_har[key] = bool(key in attrs) | |
# Expiration time needs to be formatted | |
expire_ts = cookies.get_expiration_ts(attrs) | |
if expire_ts is not None: | |
cookie_har["expires"] = datetime.fromtimestamp(expire_ts).isoformat() | |
rv.append(cookie_har) | |
return rv | |
def format_request_cookies(fields): | |
return format_cookies(cookies.group_cookies(fields)) | |
def format_response_cookies(fields): | |
return format_cookies((c[0], c[1][0], c[1][1]) for c in fields) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment