Skip to content

Instantly share code, notes, and snippets.

@DanaEpp
Created May 10, 2024 17:49
Show Gist options
  • Save DanaEpp/532e55648ebf26e226ebfa6454b9ca43 to your computer and use it in GitHub Desktop.
Save DanaEpp/532e55648ebf26e226ebfa6454b9ca43 to your computer and use it in GitHub Desktop.
HAR capture reader to use with Sensitive Data Detector. see: https://danaepp.com/sensitive-data-detection-using-ai-for-api-hackers
rom base64 import b64decode
import os
from typing import Iterator, Union
import json_stream
# This HAR capture reader was taken from mitmproxy2swagger and slightly modified to work for our needs.
# See https://github.com/alufers/mitmproxy2swagger/blob/master/mitmproxy2swagger/har_capture_reader.py
class HarFlowWrapper:
def __init__(self, flow: dict):
self.flow = flow
def get_url(self):
return self.flow["request"]["url"]
def get_matching_url(self, prefix) -> Union[str, None]:
"""Get the requests URL if the prefix matches the URL, None otherwise."""
if self.flow["request"]["url"].startswith(prefix):
return self.flow["request"]["url"]
return None
def get_method(self):
return self.flow["request"]["method"]
def get_request_headers(self):
headers = {}
for kv in self.flow["request"]["headers"]:
k = kv["name"]
v = kv["value"]
# create list on key if it does not exist
headers[k] = headers.get(k, [])
headers[k].append(v)
def get_request_body(self):
if (
"request" in self.flow
and "postData" in self.flow["request"]
and "text" in self.flow["request"]["postData"]
):
return self.flow["request"]["postData"]["text"]
return None
def get_response_status_code(self):
return self.flow["response"]["status"]
def get_response_reason(self):
return self.flow["response"]["statusText"]
def get_response_http_version(self):
if( "response" in self.flow and "httpVersion" in self.flow["response"] ):
return self.flow["response"]["httpVersion"]
return None
def get_response_content_type(self) -> str:
content_type: str = "text/plain"
if(
"response" in self.flow
and "headers" in self.flow["response"]
):
for kv in self.flow["response"]["headers"]:
k = kv["name"]
if k.lower() == "content-type":
content_type = kv["value"]
break
return content_type
def get_response_headers(self):
headers = {}
if( "response" in self.flow and "headers" in self.flow["response"] ):
for kv in self.flow["response"]["headers"]:
k = kv["name"]
v = kv["value"]
# create list on key if it does not exist
#headers[k] = headers.get(k, [])
#headers[k].append(v)
headers[k] = v
return headers
def get_response_body(self):
if (
"response" in self.flow
and "content" in self.flow["response"]
and "text" in self.flow["response"]["content"]
):
try:
if (
"encoding" in self.flow["response"]["content"]
and self.flow["response"]["content"]["encoding"] == "base64"
):
return b64decode(self.flow["response"]["content"]["text"]).decode()
except UnicodeDecodeError:
return None
return self.flow["response"]["content"]["text"]
return None
class HarCaptureReader:
def __init__(self, file_path: str, progress_callback=None):
self.file_path = file_path
self.progress_callback = progress_callback
def captured_requests(self) -> Iterator[HarFlowWrapper]:
har_file_size = os.path.getsize(self.file_path)
with open(self.file_path, "r", encoding="utf-8") as f:
data = json_stream.load(f)
for entry in data["log"]["entries"].persistent():
if self.progress_callback:
self.progress_callback(f.tell() / har_file_size)
yield HarFlowWrapper(entry)
def name(self):
return "har"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment