Skip to content

Instantly share code, notes, and snippets.

@leobeeson
Created June 8, 2024 09:25
Show Gist options
  • Save leobeeson/c7997f4cf3670951d15857ceef0d6b75 to your computer and use it in GitHub Desktop.
Save leobeeson/c7997f4cf3670951d15857ceef0d6b75 to your computer and use it in GitHub Desktop.
Two methods for using an ontology .ttl file to guide data validation and transformations in an en ETL pipeline.
import rdflib
from rdflib.namespace import RDF, RDFS, OWL
# Create a Graph
g = rdflib.Graph()
# Parse the TTL file
ttl_file_path = 'path/to/your/ontology.ttl'
g.parse(ttl_file_path, format='ttl')
# Print the number of triples in the graph
print(f"Graph has {len(g)} statements.")
# Function to extract classes, properties, and required properties
def extract_classes_and_properties(graph: rdflib.Graph) -> dict:
data_models = {}
# Query to extract classes
class_query = """
SELECT ?class
WHERE {
?class a owl:Class .
}
"""
for row in graph.query(class_query):
class_uri = str(row[0]) # Should be accessing `row.class`, but because `class` is a reserved keyword, we use `row[0]
data_models[class_uri] = {"properties": [], "required": []}
# Query to extract properties and their domains
property_query = """
SELECT ?property ?domain
WHERE {
?property a rdf:Property ;
rdfs:domain ?domain .
}
"""
for row in graph.query(property_query):
property_uri = str(row.property)
domain_uri = str(row.domain)
if domain_uri in data_models:
data_models[domain_uri]["properties"].append(property_uri)
# Query to extract required properties (e.g., with minCardinality > 0)
required_property_query = """
SELECT ?property ?domain
WHERE {
?restriction a owl:Restriction ;
owl:onProperty ?property ;
owl:minCardinality ?minCardinality ;
owl:onClass ?domain .
FILTER (?minCardinality > 0)
}
"""
for row in graph.query(required_property_query):
property_uri = str(row.property)
domain_uri = str(row.domain)
if domain_uri in data_models:
data_models[domain_uri]["required"].append(property_uri)
return data_models
data_models: dict = extract_classes_and_properties(g)
print("Extracted data models:", data_models)
# Define Validation and Transformation Functions
def validate_data(data: dict, model: dict) -> bool:
for key, value in data.items():
if key not in model['properties']:
print(f"Invalid property: {key}")
return False
if key in model['required'] and not value:
print(f"Missing required property: {key}")
return False
return True
def transform_data(data: dict, model: dict) -> dict:
transformed_data = {}
for key, value in data.items():
if key in model['properties']:
transformed_data[key] = value
return transformed_data
# Example data to validate and transform. You'll want to replace this with your own data, or feed it from your data ingestion pipeline, or some stage in the transformation pipeline.
person_data = {
"http://example.org/ontology#name": "John Doe",
"http://example.org/ontology#email": "john.doe@example.com"
}
# Get the data model for a specific class (example: http://example.org/ontology#Person)
person_model: dict = data_models.get("http://example.org/ontology#Person", {})
# Validate data
is_valid: bool = validate_data(person_data, person_model)
print(f"Is data valid? {is_valid}")
# Transform data
transformed_person_data: dict = transform_data(person_data, person_model)
print("Transformed data:", transformed_person_data)
############## From ttl file to Pydantic models ##############
from rdflib.namespace import RDF, RDFS, OWL, XSD
# Function to extract classes, properties, and data types
def extract_classes_and_properties(graph: rdflib.Graph) -> dict:
data_models = {}
# Query to extract classes
class_query = """
SELECT ?class
WHERE {
?class a owl:Class .
}
"""
for row in graph.query(class_query):
class_uri = str(row[0])
class_name = class_uri.split('#')[-1]
data_models[class_name] = {"properties": {}, "required": []}
# Query to extract properties and their domains and ranges
property_query = """
SELECT ?property ?domain ?range
WHERE {
?property a rdf:Property ;
rdfs:domain ?domain ;
rdfs:range ?range .
}
"""
for row in graph.query(property_query):
property_uri = str(row[0])
domain_uri = str(row[1])
range_uri = str(row[2])
domain_name = domain_uri.split('#')[-1]
property_name = property_uri.split('#')[-1]
if domain_name in data_models:
data_models[domain_name]["properties"][property_name] = range_uri
# Query to extract required properties (e.g., with minCardinality > 0)
required_property_query = """
SELECT ?property ?domain
WHERE {
?restriction a owl:Restriction ;
owl:onProperty ?property ;
owl:minCardinality ?minCardinality ;
owl:onClass ?domain .
FILTER (?minCardinality > 0)
}
"""
for row in graph.query(required_property_query):
property_uri = str(row[0])
domain_uri = str(row[1])
domain_name = domain_uri.split('#')[-1]
property_name = property_uri.split('#')[-1]
if domain_name in data_models:
data_models[domain_name]["required"].append(property_name)
return data_models
data_models = extract_classes_and_properties(g)
print("Extracted data models:", data_models)
# Mapping RDF data types to Python types
datatype_map = {
str(XSD.string): 'str',
str(XSD.integer): 'int',
str(XSD.float): 'float',
str(XSD.double): 'float',
str(XSD.boolean): 'bool',
str(XSD.dateTime): 'datetime',
str(XSD.date): 'date'
}
from pydantic import BaseModel, Field
from typing import List, Optional
from datetime import date, datetime
# Function to generate Pydantic models
def generate_pydantic_models(data_models: dict, datatype_map: dict) -> str:
models_code = "from pydantic import BaseModel, Field\nfrom typing import List, Optional\nfrom datetime import date, datetime\n\n"
for class_name, attributes in data_models.items():
class_code = f"class {class_name}(BaseModel):\n"
if not attributes["properties"]:
class_code += " pass\n"
else:
for prop, prop_type in attributes["properties"].items():
python_type = datatype_map.get(prop_type, 'str')
if prop in attributes["required"]:
class_code += f" {prop}: {python_type}\n"
else:
class_code += f" {prop}: Optional[{python_type}] = None\n"
models_code += class_code + "\n"
return models_code
models_code = generate_pydantic_models(data_models, datatype_map)
print(models_code)
with open('generated_models.py', 'w') as f:
f.write(models_code)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment