Skip to content

Instantly share code, notes, and snippets.

@havron
Last active September 15, 2021 01:44
Show Gist options
  • Save havron/a9e2fc93bc14877336643fcc4b53f0bc to your computer and use it in GitHub Desktop.
Save havron/a9e2fc93bc14877336643fcc4b53f0bc to your computer and use it in GitHub Desktop.
MITMProxy's default dump script is *not* Pandas friendly! Run `mitmdump -s MITM_to_csv.py -q` with this gist to get CSV for requests and responses. The 'id' is what uniquely links requests with their response pair -- I might eventually move to HAR style dump to join reqs and resps.
from mitmproxy import http
from mitmproxy.net.http import cookies
from mitmproxy.utils import strutils
from datetime import datetime
import csv
import base64
import json
REQUESTS_LOG = 'requests.mitm.csv'
RESPONSES_LOG = 'responses.mitm.csv'
# took out 'text'
resp_flows = ['http_version', 'status_code', 'reason',
'body_size', 'body_decoded_size','mimeType', 'body_compression',\
'content', 'timestamp_start', 'timestamp_end',\
'cookies']
req_flows = ['first_line_format', 'method', 'scheme', 'host', \
'port', 'path', 'http_version', 'content',\
'timestamp_start', 'timestamp_end', 'url',\
'pretty_host', 'pretty_url', 'query', 'cookies', \
'path_components', 'urlencoded_form', 'multipart_form']
connection = ['id', 'address', 'port', 'headers']
requests_buffer = []
responses_buffer = []
print("Initializing log files...")
with open(REQUESTS_LOG, 'w') as fh:
writer = csv.writer(fh)
writer.writerow(connection+req_flows)
with open(RESPONSES_LOG, 'w') as fh:
writer = csv.writer(fh)
writer.writerow(connection+resp_flows)
print("Logs initialized.")
def done():
"""
Called once on script shutdown, after any other events.
"""
print("Saving HTTP/HTTPs requests and responses...")
if len(requests_buffer) > 0:
print("Saving last requests ({})..."\
.format(len(requests_buffer)))
with open(REQUESTS_LOG, 'a') as fh:
writer = csv.writer(fh)
writer.writerows(requests_buffer)
if len(responses_buffer) > 0:
print("Saving last responses ({})..."\
.format(len(responses_buffer)))
with open(RESPONSES_LOG, 'a') as fh:
writer = csv.writer(fh)
writer.writerows(responses_buffer)
print("Saved.")
def response(flow: http.HTTPFlow) -> None:
"""
Called when a server response has been received.
"""
global responses_buffer
response_body_size = len(flow.response.raw_content)
response_body_decoded_size = len(flow.response.content)
response_body_compression = response_body_decoded_size - response_body_size
headers = {}
for k,v in flow.response.headers.items():
headers[k] = v
headers = json.dumps(headers)
content = ""
if strutils.is_mostly_bin(flow.response.content):
content = base64.b64encode(flow.response.content).decode('utf-8')
else:
content = flow.response.text
server = ""
port = ""
if flow.server_conn.address is not None:
server = flow.server_conn.address[0]
port = flow.server_conn.address[1]
response_entry = [id(flow), server,\
port, headers,\
flow.response.http_version,\
flow.response.status_code, flow.response.reason,\
response_body_size, response_body_decoded_size,\
flow.response.headers.get('Content-Type', ''), response_body_compression,\
content, datetime.fromtimestamp(flow.response.timestamp_start), \
datetime.fromtimestamp(flow.response.timestamp_end),\
json.dumps(format_response_cookies(flow.response.cookies.fields))]
responses_buffer.append(response_entry)
if len(responses_buffer) > 2:
print("Dumping last 3 responses...")
with open(RESPONSES_LOG, 'a') as fh:
writer = csv.writer(fh)
writer.writerows(responses_buffer)
del responses_buffer[:]
print("Dumped last 3 responses.")
def request(flow: http.HTTPFlow) -> None:
"""
Called when a server request has been received.
"""
global requests_buffer
headers = {}
for k,v in flow.request.headers.items():
headers[k] = v
headers = json.dumps(headers)
urlforms = {}
for k,v in flow.request.urlencoded_form.items():
urlforms[str(k)] = v
urlforms = json.dumps(urlforms)
multipart = {}
for k,v in flow.request.multipart_form.items(multi=True):
if strutils.is_mostly_bin(str(v)):
v = base64.b64encode(v).decode('utf-8')
multipart[str(k)] = str(v)
multipart = json.dumps(multipart)
query = {}
for k,v in flow.request.query.items():
query[k] = v
query = json.dumps(query)
content = ""
if strutils.is_mostly_bin(flow.request.content):
content = base64.b64encode(flow.request.content).decode('utf-8')
else:
content = flow.request.text
server = ""
port = ""
if flow.server_conn.address is not None:
server = flow.server_conn.address[0]
port = flow.server_conn.address[1]
request_entry = [id(flow), server, \
port, headers,\
flow.request.first_line_format, \
flow.request.method, flow.request.scheme,\
flow.request.host, flow.request.port,\
flow.request.path, flow.request.http_version, content,\
datetime.fromtimestamp(flow.request.timestamp_start),\
datetime.fromtimestamp(flow.request.timestamp_end),\
flow.request.url, flow.request.pretty_host,\
flow.request.pretty_url, query,
json.dumps(format_request_cookies(flow.request.cookies.fields)),\
flow.request.path_components, \
urlforms,multipart]
requests_buffer.append(request_entry)
if len(requests_buffer) > 2:
print("Dumping last 3 requests...")
with open(REQUESTS_LOG, 'a') as fh:
writer = csv.writer(fh)
writer.writerows(requests_buffer)
del requests_buffer[:]
print("Dumped last 3 requests.")
def format_cookies(cookie_list):
'''
cookie formatter from
https://github.com/mitmproxy/mitmproxy/blob/v4.0.4/examples/complex/har_dump.py
under MIT License.
'''
rv = []
for name, value, attrs in cookie_list:
cookie_har = {
"name": name,
"value": value,
}
# HAR only needs some attributes
for key in ["path", "domain", "comment"]:
if key in attrs:
cookie_har[key] = attrs[key]
# These keys need to be boolean!
for key in ["httpOnly", "secure"]:
cookie_har[key] = bool(key in attrs)
# Expiration time needs to be formatted
expire_ts = cookies.get_expiration_ts(attrs)
if expire_ts is not None:
cookie_har["expires"] = datetime.fromtimestamp(expire_ts).isoformat()
rv.append(cookie_har)
return rv
def format_request_cookies(fields):
return format_cookies(cookies.group_cookies(fields))
def format_response_cookies(fields):
return format_cookies((c[0], c[1][0], c[1][1]) for c in fields)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment