Skip to content

Instantly share code, notes, and snippets.

@simonw

simonw/buggy.py Secret

Created February 27, 2025 19:05
def schema_dsl(schema_dsl: str, multi: bool = False) -> Dict[str, Any]:
"""
Build a JSON schema from a concise schema string with support for nested objects and arrays.
Args:
schema_dsl: A string representing a schema in the concise format.
Can be comma-separated or newline-separated.
multi: Boolean, return a schema for an "items" array of these
Returns:
A dictionary representing the JSON schema.
"""
# Type mapping dictionary
type_mapping = {
"int": "integer",
"float": "number",
"bool": "boolean",
"str": "string",
}
def parse_schema(schema_str: str) -> Dict[str, Any]:
"""Parse a schema string into a JSON schema object."""
schema = {"type": "object", "properties": {}, "required": []}
# Split into fields
fields = split_fields(schema_str)
# Process each field
for field in fields:
process_field(field, schema)
return schema
def split_fields(text: str) -> List[str]:
"""Split text into fields, either by newline or comma, respecting nesting."""
if "\n" in text:
return [field.strip() for field in text.split("\n") if field.strip()]
result = []
current = ""
nesting_level = 0
for char in text:
if char in "[{":
nesting_level += 1
current += char
elif char in "]}":
nesting_level -= 1
current += char
elif char == "," and nesting_level == 0:
if current.strip():
result.append(current.strip())
current = ""
else:
current += char
if current.strip():
result.append(current.strip())
return result
def extract_nested_content(text: str, open_char: str, close_char: str) -> str:
"""Extract content between balanced open and close characters."""
start = text.find(open_char)
if start == -1:
return ""
balance = 1
pos = start + 1
while pos < len(text) and balance > 0:
if text[pos] == open_char:
balance += 1
elif text[pos] == close_char:
balance -= 1
pos += 1
if balance != 0:
raise ValueError(f"Unbalanced {open_char}{close_char} in: {text}")
return text[start + 1 : pos - 1]
def process_field(field: str, schema: Dict[str, Any]) -> None:
"""Process a field and add it to the schema."""
# Extract description if present
if ":" in field:
field_info, description = field.split(":", 1)
description = description.strip()
else:
field_info = field
description = ""
# Extract field name and additional info
parts = field_info.strip().split(maxsplit=1)
field_name = parts[0].strip()
# Add to required fields
schema["required"].append(field_name)
# Default field schema is string type
field_schema = {"type": "string"}
# Process type or structure if present
if len(parts) > 1:
type_or_structure = parts[1].strip()
# Check for array notation [...]
if "[" in type_or_structure and "]" in type_or_structure:
array_content = extract_nested_content(type_or_structure, "[", "]")
field_schema = process_array(array_content)
# Check for object notation {...}
elif "{" in type_or_structure and "}" in type_or_structure:
object_content = extract_nested_content(type_or_structure, "{", "}")
field_schema = parse_schema(object_content)
# Simple type
elif type_or_structure in type_mapping:
field_schema = {"type": type_mapping[type_or_structure]}
# Add description if provided
if description:
field_schema["description"] = description
# Add field to schema
schema["properties"][field_name] = field_schema
def process_array(array_content: str) -> Dict[str, Any]:
"""Process array content and return an array schema."""
array_schema = {"type": "array"}
# Empty array defaults to array of strings
if not array_content.strip():
array_schema["items"] = {"type": "string"}
return array_schema
# Check if it's an array of objects (comma-separated fields)
if "," in array_content:
# This is an array of objects with multiple fields
array_schema["items"] = parse_schema(array_content)
else:
# Single field or type - could be a type specification
type_parts = array_content.strip().split()
if type_parts and type_parts[0] in type_mapping:
array_schema["items"] = {"type": type_mapping[type_parts[0]]}
else:
# Default to string if not a recognized type
array_schema["items"] = {"type": "string"}
return array_schema
# Parse the schema
result = parse_schema(schema_dsl)
if multi:
return multi_schema(result)
else:
return result
def multi_schema(schema: dict) -> dict:
"Wrap JSON schema in an 'items': [] array"
return {
"type": "object",
"properties": {"items": {"type": "array", "items": schema}},
"required": ["items"],
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment