Created
May 10, 2024 17:49
-
-
Save DanaEpp/532e55648ebf26e226ebfa6454b9ca43 to your computer and use it in GitHub Desktop.
HAR capture reader to use with Sensitive Data Detector. see: https://danaepp.com/sensitive-data-detection-using-ai-for-api-hackers
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
rom base64 import b64decode | |
import os | |
from typing import Iterator, Union | |
import json_stream | |
# This HAR capture reader was taken from mitmproxy2swagger and slightly modified to work for our needs. | |
# See https://github.com/alufers/mitmproxy2swagger/blob/master/mitmproxy2swagger/har_capture_reader.py | |
class HarFlowWrapper: | |
def __init__(self, flow: dict): | |
self.flow = flow | |
def get_url(self): | |
return self.flow["request"]["url"] | |
def get_matching_url(self, prefix) -> Union[str, None]: | |
"""Get the requests URL if the prefix matches the URL, None otherwise.""" | |
if self.flow["request"]["url"].startswith(prefix): | |
return self.flow["request"]["url"] | |
return None | |
def get_method(self): | |
return self.flow["request"]["method"] | |
def get_request_headers(self): | |
headers = {} | |
for kv in self.flow["request"]["headers"]: | |
k = kv["name"] | |
v = kv["value"] | |
# create list on key if it does not exist | |
headers[k] = headers.get(k, []) | |
headers[k].append(v) | |
def get_request_body(self): | |
if ( | |
"request" in self.flow | |
and "postData" in self.flow["request"] | |
and "text" in self.flow["request"]["postData"] | |
): | |
return self.flow["request"]["postData"]["text"] | |
return None | |
def get_response_status_code(self): | |
return self.flow["response"]["status"] | |
def get_response_reason(self): | |
return self.flow["response"]["statusText"] | |
def get_response_http_version(self): | |
if( "response" in self.flow and "httpVersion" in self.flow["response"] ): | |
return self.flow["response"]["httpVersion"] | |
return None | |
def get_response_content_type(self) -> str: | |
content_type: str = "text/plain" | |
if( | |
"response" in self.flow | |
and "headers" in self.flow["response"] | |
): | |
for kv in self.flow["response"]["headers"]: | |
k = kv["name"] | |
if k.lower() == "content-type": | |
content_type = kv["value"] | |
break | |
return content_type | |
def get_response_headers(self): | |
headers = {} | |
if( "response" in self.flow and "headers" in self.flow["response"] ): | |
for kv in self.flow["response"]["headers"]: | |
k = kv["name"] | |
v = kv["value"] | |
# create list on key if it does not exist | |
#headers[k] = headers.get(k, []) | |
#headers[k].append(v) | |
headers[k] = v | |
return headers | |
def get_response_body(self): | |
if ( | |
"response" in self.flow | |
and "content" in self.flow["response"] | |
and "text" in self.flow["response"]["content"] | |
): | |
try: | |
if ( | |
"encoding" in self.flow["response"]["content"] | |
and self.flow["response"]["content"]["encoding"] == "base64" | |
): | |
return b64decode(self.flow["response"]["content"]["text"]).decode() | |
except UnicodeDecodeError: | |
return None | |
return self.flow["response"]["content"]["text"] | |
return None | |
class HarCaptureReader: | |
def __init__(self, file_path: str, progress_callback=None): | |
self.file_path = file_path | |
self.progress_callback = progress_callback | |
def captured_requests(self) -> Iterator[HarFlowWrapper]: | |
har_file_size = os.path.getsize(self.file_path) | |
with open(self.file_path, "r", encoding="utf-8") as f: | |
data = json_stream.load(f) | |
for entry in data["log"]["entries"].persistent(): | |
if self.progress_callback: | |
self.progress_callback(f.tell() / har_file_size) | |
yield HarFlowWrapper(entry) | |
def name(self): | |
return "har" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment