Created
January 9, 2025 11:47
Tool to find Graphene type definitions (Python) and cross reference with Apollo GraphQL (Typescript)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import logging | |
import argparse | |
import json | |
import ast | |
import os | |
import re | |
import pandas as pd | |
from graphql import ( | |
parse, | |
build_client_schema, | |
get_operation_root_type, | |
GraphQLList, | |
GraphQLNonNull, | |
) | |
def filter_dirnames(dirnames): | |
return [ | |
dirname | |
for dirname in dirnames | |
if dirname | |
not in [ | |
"env", | |
"node_modules", | |
"dist", | |
"generated", | |
"site-packages", | |
] | |
] | |
def get_relative_filename(projectdir, pathname): | |
if not projectdir.endswith("/"): | |
projectdir += "/" | |
return pathname.replace(projectdir, "") | |
def load_schema(projectdir): | |
for root, dirname, filenames in os.walk(projectdir): | |
for filename in filenames: | |
if filename == "schema.json": | |
with open(os.path.join(root, filename)) as f: | |
data = json.load(f) | |
return build_client_schema(data["data"]) | |
def get_field_type(graphql_type): | |
if isinstance(graphql_type, (GraphQLNonNull, GraphQLList)): | |
return get_field_type(graphql_type.of_type) | |
return graphql_type | |
def load_selection_types(schema, operation_type, selections): | |
for selection in selections: | |
field_name = selection.name.value | |
if field_name != "__typename": | |
field = operation_type.fields[field_name] | |
field_type = get_field_type(field.type) | |
yield field_name, field_type.name | |
if selection.selection_set and hasattr(field_type, "fields"): | |
yield from load_selection_types( | |
schema, field_type, selection.selection_set.selections | |
) | |
def load_frontend_definitions(projectdir, schema, excludes=None): | |
logging.info("Loading frontend GraphQL type definitions in %s", projectdir) | |
for root, dirnames, filenames in os.walk(projectdir): | |
dirnames[:] = filter_dirnames(dirnames) | |
for filename in filenames: | |
if excludes and filename in excludes: | |
continue | |
if filename.endswith(".graphql"): | |
pathname = os.path.join(root, filename) | |
logging.debug("Processing %s", pathname) | |
with open(pathname) as f: | |
document = parse(f.read()) | |
for definition in document.definitions: | |
if definition.kind == "operation_definition": | |
operation_type = get_operation_root_type(schema, definition) | |
for field_name, field_type in load_selection_types( | |
schema, | |
operation_type, | |
definition.selection_set.selections, | |
): | |
yield { | |
"filename": get_relative_filename( | |
projectdir, pathname | |
), | |
"operation_type": operation_type.name, | |
"definition_name": definition.name.value, | |
"field_name": field_name, | |
"field_type": field_type, | |
} | |
class GrapheneAnalyzer(ast.NodeVisitor): | |
def __init__(self, projectdir): | |
self.projectdir = projectdir | |
self.current_filename = None | |
self.object_types = {} | |
def _get_field_type(self, value): | |
if isinstance(value, ast.Call) and hasattr(value.func, "attr"): | |
return value.func.attr | |
return "Unknown" | |
def _get_schema_argument(self, node, arg_name): | |
for keyword in node.keywords: | |
if keyword.arg == arg_name: | |
return ( | |
keyword.value.id | |
if isinstance(keyword.value, ast.Name) | |
else "Unknown" | |
) | |
return None | |
def visit_ClassDef(self, node): | |
for base in node.bases: | |
if isinstance(base, ast.Attribute) and ( | |
base.attr == "ObjectType" or base.attr == "Mutation" | |
): | |
fields = [] | |
for class_node in node.body: | |
if isinstance(class_node, ast.Assign): | |
for target in class_node.targets: | |
if isinstance(target, ast.Name): | |
field_type = self._get_field_type(class_node.value) | |
fields.append({"name": target.id, "type": field_type}) | |
self.object_types[node.name] = { | |
"fields": fields, | |
"filename": get_relative_filename( | |
self.projectdir, self.current_filename | |
), | |
} | |
self.generic_visit(node) | |
def parse_source(self, filename): | |
with open(filename) as f: | |
return ast.parse(f.read()) | |
def analyze(self, filename): | |
self.current_filename = filename | |
tree = self.parse_source(filename) | |
self.visit(tree) | |
def report(self): | |
return self.object_types | |
def load_backend_definitions(projectdir, excludes=None): | |
logging.info("Loading backend Graphene GraphQL type definitions in %s", projectdir) | |
analyzer = GrapheneAnalyzer(projectdir) | |
for root, dirnames, filenames in os.walk(projectdir): | |
dirnames[:] = filter_dirnames(dirnames) | |
for filename in filenames: | |
if excludes and filename in excludes: | |
continue | |
if filename.endswith(".py"): | |
pathname = os.path.join(root, filename) | |
logging.debug("Processing %s", pathname) | |
analyzer.analyze(pathname) | |
return analyzer.report() | |
def get_usage_keywords(frontend_defs): | |
keywords = {} | |
for frontend_def in frontend_defs: | |
name = frontend_def["definition_name"] | |
keywords[f"{name}Document"] = frontend_def | |
if frontend_def["operation_type"] == "Query": | |
keywords[f"use{name}Query"] = frontend_def | |
keywords[f"use{name}LazyQuery"] = frontend_def | |
elif frontend_def["operation_type"] == "Mutation": | |
keywords[f"use{name}Mutation"] = frontend_def | |
return keywords | |
def get_usage_keyword_regex(usage_keywords): | |
return re.compile(r"\b(" + "|".join(usage_keywords.keys()) + r")\b") | |
def load_frontend_usages(projectdir, frontend_defs, excludes=None): | |
logging.info("Loading frontend GraphQL type/hook usages in %s", projectdir) | |
usage_keywords = get_usage_keywords(frontend_defs) | |
usage_keyword_regex = get_usage_keyword_regex(usage_keywords) | |
filename_regex = re.compile(r"\.(jsx?|tsx?)$") | |
for root, dirnames, filenames in os.walk(projectdir): | |
dirnames[:] = filter_dirnames(dirnames) | |
for filename in filenames: | |
if excludes and filename in excludes: | |
continue | |
if filename_regex.search(filename): | |
pathname = os.path.join(root, filename) | |
logging.debug("Processing %s", pathname) | |
with open(pathname) as f: | |
content = f.read() | |
for match in usage_keyword_regex.findall(content): | |
frontend_def = usage_keywords[match] | |
yield { | |
"filename": get_relative_filename(projectdir, pathname), | |
"usage": match, | |
"definition": frontend_def, | |
} | |
def get_df_usage(backend_defs, frontend_defs, frontend_usages): | |
# Backend definitions | |
df_b = pd.DataFrame( | |
[ | |
dict(field_type=key, filename_backend=value["filename"]) | |
for key, value in backend_defs.items() | |
] | |
) | |
# Frontend definitions | |
df_f = pd.DataFrame(frontend_defs)[ | |
["field_type", "filename", "definition_name"] | |
].rename(columns={"filename": "filename_frontend"}) | |
# Frontend usages | |
df_u = pd.DataFrame( | |
[ | |
dict( | |
filename_frontend_usage=elt["filename"], | |
definition_name=elt["definition"]["definition_name"], | |
) | |
for elt in frontend_usages | |
] | |
) | |
df = df_b.merge(df_f, on="field_type", how="left").merge( | |
df_u, on="definition_name", how="left" | |
) | |
return df.sort_values(by="field_type") | |
def run(projectdir, outputfile, excludes=None): | |
# Analyze GraphQL schema and types via AST | |
schema = load_schema(projectdir) | |
backend_defs = load_backend_definitions(projectdir, excludes=excludes) | |
frontend_defs = list( | |
load_frontend_definitions(projectdir, schema, excludes=excludes) | |
) | |
frontend_usages = list( | |
load_frontend_usages(projectdir, frontend_defs, excludes=excludes) | |
) | |
# Construct DataFrames from analysis results | |
df_usage = get_df_usage(backend_defs, frontend_defs, frontend_usages) | |
# Write report spreadsheet | |
os.makedirs(os.path.dirname(outputfile), exist_ok=True) | |
df_usage.to_csv(outputfile, index=False) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser() | |
parser.add_argument("projectdir") | |
parser.add_argument("outputfile") | |
parser.add_argument("--exclude", action="append") | |
parser.add_argument("--debug", action="store_true") | |
args = parser.parse_args() | |
logging.basicConfig( | |
format="%(levelname)s:%(message)s", | |
level=logging.DEBUG if args.debug else logging.INFO, | |
) | |
run( | |
args.projectdir, | |
args.outputfile, | |
excludes=args.exclude, | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment