Skip to content

Instantly share code, notes, and snippets.

@brandtg
Created January 9, 2025 11:47

Revisions

  1. brandtg created this gist Jan 9, 2025.
    268 changes: 268 additions & 0 deletions analyzegraphene.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,268 @@
    #!/usr/bin/env python3
    import logging
    import argparse
    import json
    import ast
    import os
    import re
    import pandas as pd
    from graphql import (
    parse,
    build_client_schema,
    get_operation_root_type,
    GraphQLList,
    GraphQLNonNull,
    )


    def filter_dirnames(dirnames):
    return [
    dirname
    for dirname in dirnames
    if dirname
    not in [
    "env",
    "node_modules",
    "dist",
    "generated",
    "site-packages",
    ]
    ]


    def get_relative_filename(projectdir, pathname):
    if not projectdir.endswith("/"):
    projectdir += "/"
    return pathname.replace(projectdir, "")


    def load_schema(projectdir):
    for root, dirname, filenames in os.walk(projectdir):
    for filename in filenames:
    if filename == "schema.json":
    with open(os.path.join(root, filename)) as f:
    data = json.load(f)
    return build_client_schema(data["data"])


    def get_field_type(graphql_type):
    if isinstance(graphql_type, (GraphQLNonNull, GraphQLList)):
    return get_field_type(graphql_type.of_type)
    return graphql_type


    def load_selection_types(schema, operation_type, selections):
    for selection in selections:
    field_name = selection.name.value
    if field_name != "__typename":
    field = operation_type.fields[field_name]
    field_type = get_field_type(field.type)
    yield field_name, field_type.name
    if selection.selection_set and hasattr(field_type, "fields"):
    yield from load_selection_types(
    schema, field_type, selection.selection_set.selections
    )


    def load_frontend_definitions(projectdir, schema, excludes=None):
    logging.info("Loading frontend GraphQL type definitions in %s", projectdir)
    for root, dirnames, filenames in os.walk(projectdir):
    dirnames[:] = filter_dirnames(dirnames)
    for filename in filenames:
    if excludes and filename in excludes:
    continue
    if filename.endswith(".graphql"):
    pathname = os.path.join(root, filename)
    logging.debug("Processing %s", pathname)
    with open(pathname) as f:
    document = parse(f.read())
    for definition in document.definitions:
    if definition.kind == "operation_definition":
    operation_type = get_operation_root_type(schema, definition)
    for field_name, field_type in load_selection_types(
    schema,
    operation_type,
    definition.selection_set.selections,
    ):
    yield {
    "filename": get_relative_filename(
    projectdir, pathname
    ),
    "operation_type": operation_type.name,
    "definition_name": definition.name.value,
    "field_name": field_name,
    "field_type": field_type,
    }


    class GrapheneAnalyzer(ast.NodeVisitor):
    def __init__(self, projectdir):
    self.projectdir = projectdir
    self.current_filename = None
    self.object_types = {}

    def _get_field_type(self, value):
    if isinstance(value, ast.Call) and hasattr(value.func, "attr"):
    return value.func.attr
    return "Unknown"

    def _get_schema_argument(self, node, arg_name):
    for keyword in node.keywords:
    if keyword.arg == arg_name:
    return (
    keyword.value.id
    if isinstance(keyword.value, ast.Name)
    else "Unknown"
    )
    return None

    def visit_ClassDef(self, node):
    for base in node.bases:
    if isinstance(base, ast.Attribute) and (
    base.attr == "ObjectType" or base.attr == "Mutation"
    ):
    fields = []
    for class_node in node.body:
    if isinstance(class_node, ast.Assign):
    for target in class_node.targets:
    if isinstance(target, ast.Name):
    field_type = self._get_field_type(class_node.value)
    fields.append({"name": target.id, "type": field_type})
    self.object_types[node.name] = {
    "fields": fields,
    "filename": get_relative_filename(
    self.projectdir, self.current_filename
    ),
    }
    self.generic_visit(node)

    def parse_source(self, filename):
    with open(filename) as f:
    return ast.parse(f.read())

    def analyze(self, filename):
    self.current_filename = filename
    tree = self.parse_source(filename)
    self.visit(tree)

    def report(self):
    return self.object_types


    def load_backend_definitions(projectdir, excludes=None):
    logging.info("Loading backend Graphene GraphQL type definitions in %s", projectdir)
    analyzer = GrapheneAnalyzer(projectdir)
    for root, dirnames, filenames in os.walk(projectdir):
    dirnames[:] = filter_dirnames(dirnames)
    for filename in filenames:
    if excludes and filename in excludes:
    continue
    if filename.endswith(".py"):
    pathname = os.path.join(root, filename)
    logging.debug("Processing %s", pathname)
    analyzer.analyze(pathname)
    return analyzer.report()


    def get_usage_keywords(frontend_defs):
    keywords = {}
    for frontend_def in frontend_defs:
    name = frontend_def["definition_name"]
    keywords[f"{name}Document"] = frontend_def
    if frontend_def["operation_type"] == "Query":
    keywords[f"use{name}Query"] = frontend_def
    keywords[f"use{name}LazyQuery"] = frontend_def
    elif frontend_def["operation_type"] == "Mutation":
    keywords[f"use{name}Mutation"] = frontend_def
    return keywords


    def get_usage_keyword_regex(usage_keywords):
    return re.compile(r"\b(" + "|".join(usage_keywords.keys()) + r")\b")


    def load_frontend_usages(projectdir, frontend_defs, excludes=None):
    logging.info("Loading frontend GraphQL type/hook usages in %s", projectdir)
    usage_keywords = get_usage_keywords(frontend_defs)
    usage_keyword_regex = get_usage_keyword_regex(usage_keywords)
    filename_regex = re.compile(r"\.(jsx?|tsx?)$")
    for root, dirnames, filenames in os.walk(projectdir):
    dirnames[:] = filter_dirnames(dirnames)
    for filename in filenames:
    if excludes and filename in excludes:
    continue
    if filename_regex.search(filename):
    pathname = os.path.join(root, filename)
    logging.debug("Processing %s", pathname)
    with open(pathname) as f:
    content = f.read()
    for match in usage_keyword_regex.findall(content):
    frontend_def = usage_keywords[match]
    yield {
    "filename": get_relative_filename(projectdir, pathname),
    "usage": match,
    "definition": frontend_def,
    }


    def get_df_usage(backend_defs, frontend_defs, frontend_usages):
    # Backend definitions
    df_b = pd.DataFrame(
    [
    dict(field_type=key, filename_backend=value["filename"])
    for key, value in backend_defs.items()
    ]
    )
    # Frontend definitions
    df_f = pd.DataFrame(frontend_defs)[
    ["field_type", "filename", "definition_name"]
    ].rename(columns={"filename": "filename_frontend"})
    # Frontend usages
    df_u = pd.DataFrame(
    [
    dict(
    filename_frontend_usage=elt["filename"],
    definition_name=elt["definition"]["definition_name"],
    )
    for elt in frontend_usages
    ]
    )
    df = df_b.merge(df_f, on="field_type", how="left").merge(
    df_u, on="definition_name", how="left"
    )
    return df.sort_values(by="field_type")


    def run(projectdir, outputfile, excludes=None):
    # Analyze GraphQL schema and types via AST
    schema = load_schema(projectdir)
    backend_defs = load_backend_definitions(projectdir, excludes=excludes)
    frontend_defs = list(
    load_frontend_definitions(projectdir, schema, excludes=excludes)
    )
    frontend_usages = list(
    load_frontend_usages(projectdir, frontend_defs, excludes=excludes)
    )
    # Construct DataFrames from analysis results
    df_usage = get_df_usage(backend_defs, frontend_defs, frontend_usages)
    # Write report spreadsheet
    os.makedirs(os.path.dirname(outputfile), exist_ok=True)
    df_usage.to_csv(outputfile, index=False)


    if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("projectdir")
    parser.add_argument("outputfile")
    parser.add_argument("--exclude", action="append")
    parser.add_argument("--debug", action="store_true")
    args = parser.parse_args()
    logging.basicConfig(
    format="%(levelname)s:%(message)s",
    level=logging.DEBUG if args.debug else logging.INFO,
    )
    run(
    args.projectdir,
    args.outputfile,
    excludes=args.exclude,
    )