Skip to content

Instantly share code, notes, and snippets.

@abbycantcode
Created May 3, 2024 05:01
Show Gist options
  • Save abbycantcode/4352002cd5370e6080879ae5fda3a179 to your computer and use it in GitHub Desktop.
Save abbycantcode/4352002cd5370e6080879ae5fda3a179 to your computer and use it in GitHub Desktop.
Parses Caido JSON project file and parses all the requests and saves graphql queries!
import json
import os
import base64
import argparse
from graphql import parse
def parse_http_request(http_request):
"""Parse the HTTP request to extract the JSON body and Host header."""
try:
headers_and_body = http_request.split('\r\n\r\n')
if len(headers_and_body) < 2:
return None, None
json_body = headers_and_body[1]
headers = headers_and_body[0].split('\r\n')
host = next((header.split(': ')[1] for header in headers if header.startswith('Host: ')), None)
return json.loads(json_body), host
except (ValueError, json.JSONDecodeError):
return None, None
def is_graphql_query(content):
try:
if isinstance(content, str):
content = json.loads(content)
parse(content['query'])
return True
except Exception:
return False
def extract_variables(query):
variables = []
try:
ast = parse(query)
for definition in ast.definitions:
if hasattr(definition, 'variable_definitions'):
variables.extend([var.variable.name.value for var in definition.variable_definitions])
except Exception:
pass
return variables
def create_query_file(directory, operation_name, query):
file_name = f"{operation_name}.graphql"
file_path = os.path.join(directory, file_name)
with open(file_path, "w") as file:
file.write(query)
def save_operation_name(directory, operation_name):
file_path = os.path.join(directory, "operation_name.txt")
if not os.path.exists(file_path):
open(file_path, 'w').close()
if operation_name not in open(file_path).read():
with open(file_path, "a") as file:
file.write(operation_name + '\n')
def save_variables(directory, variables):
file_path = os.path.join(directory, "variables.txt")
with open(file_path, "a") as file:
for variable in variables:
if variable not in open(file_path).read():
file.write(variable + "\n")
def process_graphql_query(content, output_directory, host):
try:
if isinstance(content, str):
data = json.loads(content)
else:
data = content
operation_name = data.get('operationName', 'Unnamed_Operation')
query = data['query']
variables = extract_variables(query)
host_directory = os.path.join(output_directory, host)
queries_directory = os.path.join(host_directory, "queries")
os.makedirs(queries_directory, exist_ok=True)
create_query_file(queries_directory, operation_name, query)
save_operation_name(host_directory, operation_name)
save_variables(host_directory, variables)
except json.JSONDecodeError:
pass
def parse_request(request, output_directory):
try:
if request["method"] != "POST":
return
raw_data = base64.b64decode(request["raw"]).decode("utf-8")
except Exception as e:
print("Error decoding raw data:", e)
return
print("Parsed Request:")
print("Method:", request["method"])
print("Path:", request["path"])
print("Query:", request["query"])
print("Raw Data:")
print(raw_data)
print()
json_body, host = parse_http_request(raw_data)
if json_body and host and is_graphql_query(json_body):
process_graphql_query(json_body, output_directory, host)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Parse GraphQL requests from input JSON file")
parser.add_argument("input_file", help="Path to the input JSON file")
parser.add_argument("-o", "--output_directory", help="Output directory for parsed data", default=os.getcwd())
args = parser.parse_args()
with open(args.input_file, "r") as file:
data = json.load(file)
output_directory = args.output_directory
os.makedirs(output_directory, exist_ok=True)
for request in data:
parse_request(request, output_directory)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment