Created
January 9, 2025 11:47
Revisions
-
brandtg created this gist
Jan 9, 2025 .There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,268 @@ #!/usr/bin/env python3 import logging import argparse import json import ast import os import re import pandas as pd from graphql import ( parse, build_client_schema, get_operation_root_type, GraphQLList, GraphQLNonNull, ) def filter_dirnames(dirnames): return [ dirname for dirname in dirnames if dirname not in [ "env", "node_modules", "dist", "generated", "site-packages", ] ] def get_relative_filename(projectdir, pathname): if not projectdir.endswith("/"): projectdir += "/" return pathname.replace(projectdir, "") def load_schema(projectdir): for root, dirname, filenames in os.walk(projectdir): for filename in filenames: if filename == "schema.json": with open(os.path.join(root, filename)) as f: data = json.load(f) return build_client_schema(data["data"]) def get_field_type(graphql_type): if isinstance(graphql_type, (GraphQLNonNull, GraphQLList)): return get_field_type(graphql_type.of_type) return graphql_type def load_selection_types(schema, operation_type, selections): for selection in selections: field_name = selection.name.value if field_name != "__typename": field = operation_type.fields[field_name] field_type = get_field_type(field.type) yield field_name, field_type.name if selection.selection_set and hasattr(field_type, "fields"): yield from load_selection_types( schema, field_type, selection.selection_set.selections ) def load_frontend_definitions(projectdir, schema, excludes=None): logging.info("Loading frontend GraphQL type definitions in %s", projectdir) for root, dirnames, filenames in os.walk(projectdir): dirnames[:] = filter_dirnames(dirnames) for filename in filenames: if excludes and filename in excludes: continue if filename.endswith(".graphql"): pathname = os.path.join(root, filename) logging.debug("Processing %s", pathname) with open(pathname) as f: document = parse(f.read()) for definition in document.definitions: if definition.kind == "operation_definition": operation_type = get_operation_root_type(schema, definition) for field_name, field_type in load_selection_types( schema, operation_type, definition.selection_set.selections, ): yield { "filename": get_relative_filename( projectdir, pathname ), "operation_type": operation_type.name, "definition_name": definition.name.value, "field_name": field_name, "field_type": field_type, } class GrapheneAnalyzer(ast.NodeVisitor): def __init__(self, projectdir): self.projectdir = projectdir self.current_filename = None self.object_types = {} def _get_field_type(self, value): if isinstance(value, ast.Call) and hasattr(value.func, "attr"): return value.func.attr return "Unknown" def _get_schema_argument(self, node, arg_name): for keyword in node.keywords: if keyword.arg == arg_name: return ( keyword.value.id if isinstance(keyword.value, ast.Name) else "Unknown" ) return None def visit_ClassDef(self, node): for base in node.bases: if isinstance(base, ast.Attribute) and ( base.attr == "ObjectType" or base.attr == "Mutation" ): fields = [] for class_node in node.body: if isinstance(class_node, ast.Assign): for target in class_node.targets: if isinstance(target, ast.Name): field_type = self._get_field_type(class_node.value) fields.append({"name": target.id, "type": field_type}) self.object_types[node.name] = { "fields": fields, "filename": get_relative_filename( self.projectdir, self.current_filename ), } self.generic_visit(node) def parse_source(self, filename): with open(filename) as f: return ast.parse(f.read()) def analyze(self, filename): self.current_filename = filename tree = self.parse_source(filename) self.visit(tree) def report(self): return self.object_types def load_backend_definitions(projectdir, excludes=None): logging.info("Loading backend Graphene GraphQL type definitions in %s", projectdir) analyzer = GrapheneAnalyzer(projectdir) for root, dirnames, filenames in os.walk(projectdir): dirnames[:] = filter_dirnames(dirnames) for filename in filenames: if excludes and filename in excludes: continue if filename.endswith(".py"): pathname = os.path.join(root, filename) logging.debug("Processing %s", pathname) analyzer.analyze(pathname) return analyzer.report() def get_usage_keywords(frontend_defs): keywords = {} for frontend_def in frontend_defs: name = frontend_def["definition_name"] keywords[f"{name}Document"] = frontend_def if frontend_def["operation_type"] == "Query": keywords[f"use{name}Query"] = frontend_def keywords[f"use{name}LazyQuery"] = frontend_def elif frontend_def["operation_type"] == "Mutation": keywords[f"use{name}Mutation"] = frontend_def return keywords def get_usage_keyword_regex(usage_keywords): return re.compile(r"\b(" + "|".join(usage_keywords.keys()) + r")\b") def load_frontend_usages(projectdir, frontend_defs, excludes=None): logging.info("Loading frontend GraphQL type/hook usages in %s", projectdir) usage_keywords = get_usage_keywords(frontend_defs) usage_keyword_regex = get_usage_keyword_regex(usage_keywords) filename_regex = re.compile(r"\.(jsx?|tsx?)$") for root, dirnames, filenames in os.walk(projectdir): dirnames[:] = filter_dirnames(dirnames) for filename in filenames: if excludes and filename in excludes: continue if filename_regex.search(filename): pathname = os.path.join(root, filename) logging.debug("Processing %s", pathname) with open(pathname) as f: content = f.read() for match in usage_keyword_regex.findall(content): frontend_def = usage_keywords[match] yield { "filename": get_relative_filename(projectdir, pathname), "usage": match, "definition": frontend_def, } def get_df_usage(backend_defs, frontend_defs, frontend_usages): # Backend definitions df_b = pd.DataFrame( [ dict(field_type=key, filename_backend=value["filename"]) for key, value in backend_defs.items() ] ) # Frontend definitions df_f = pd.DataFrame(frontend_defs)[ ["field_type", "filename", "definition_name"] ].rename(columns={"filename": "filename_frontend"}) # Frontend usages df_u = pd.DataFrame( [ dict( filename_frontend_usage=elt["filename"], definition_name=elt["definition"]["definition_name"], ) for elt in frontend_usages ] ) df = df_b.merge(df_f, on="field_type", how="left").merge( df_u, on="definition_name", how="left" ) return df.sort_values(by="field_type") def run(projectdir, outputfile, excludes=None): # Analyze GraphQL schema and types via AST schema = load_schema(projectdir) backend_defs = load_backend_definitions(projectdir, excludes=excludes) frontend_defs = list( load_frontend_definitions(projectdir, schema, excludes=excludes) ) frontend_usages = list( load_frontend_usages(projectdir, frontend_defs, excludes=excludes) ) # Construct DataFrames from analysis results df_usage = get_df_usage(backend_defs, frontend_defs, frontend_usages) # Write report spreadsheet os.makedirs(os.path.dirname(outputfile), exist_ok=True) df_usage.to_csv(outputfile, index=False) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("projectdir") parser.add_argument("outputfile") parser.add_argument("--exclude", action="append") parser.add_argument("--debug", action="store_true") args = parser.parse_args() logging.basicConfig( format="%(levelname)s:%(message)s", level=logging.DEBUG if args.debug else logging.INFO, ) run( args.projectdir, args.outputfile, excludes=args.exclude, )