Skip to content

Instantly share code, notes, and snippets.

@jacopotagliabue
Last active November 11, 2024 16:32
Show Gist options
  • Save jacopotagliabue/30d30566d6a9245aabbb28fe5d7d26bb to your computer and use it in GitHub Desktop.
Save jacopotagliabue/30d30566d6a9245aabbb28fe5d7d26bb to your computer and use it in GitHub Desktop.
Graph inference for FaaS planning (sample, generated Python code, before Kuzu)
from typing import List, Optional, Set
class Column:
def __init__(self, name: str):
self.name = name
def __repr__(self):
return f"Column(name={self.name})"
class Table:
def __init__(self, name: str, columns: Set[Column], parent: Optional['Table'] = None, filter_condition: Optional[str] = None):
self.name = name
self.columns = columns # Set of Column objects
self.parent = parent
self.filter_condition = filter_condition
self.children = []
# Link this table as a child to its parent
if parent:
parent.children.append(self)
def __repr__(self):
return f"Table(name={self.name}, columns={[col.name for col in self.columns]})"
def get_column_names(self) -> Set[str]:
return {col.name for col in self.columns}
# Create column instances
c1 = Column("c1")
c2 = Column("c2")
c3 = Column("c3")
c4 = Column("c4")
c5 = Column("c5")
eventTime = Column("eventTime")
# Define tables and link columns and parent-child relationships
raw_data = Table(name="raw_data", columns={c1, c2, c3, c4, eventTime})
cleaned_data = Table(name="cleaned_data", columns={c1, c2, c3}, parent=raw_data, filter_condition="eventTime BETWEEN '2023-01-01' AND '2023-02-01'")
final_data = Table(name="final_data", columns={c1, c2, c3}, parent=cleaned_data)
training_data_correct = Table(name="training_data", columns={c1, c2}, parent=final_data)
def validate_dag(tables: List[Table]) -> bool:
"""Validate the DAG structure to check for a single root and column consistency."""
# Identify the root node (node without a parent)
root_tables = [table for table in tables if table.parent is None]
if len(root_tables) != 1:
print(f"Error: Expected exactly one root table, found {len(root_tables)}.")
return False
# Traverse each table from root and validate columns at each step
def validate_columns(table: Table) -> bool:
# If the table has a parent, ensure all columns are a subset of the parent's columns
if table.parent:
parent_column_names = table.parent.get_column_names()
table_column_names = table.get_column_names()
if not table_column_names.issubset(parent_column_names):
missing_columns = table_column_names - parent_column_names
print(f"Error: Table '{table.name}' has columns {missing_columns} not found in its immediate parent '{table.parent.name}'.")
return False
# Recursively validate for each child
for child in table.children:
if not validate_columns(child):
return False
return True
# Start validation from the root
root = root_tables[0]
is_valid = validate_columns(root)
if is_valid:
print("DAG validation successful: Single root and column consistency maintained.")
return is_valid
# test out some scenarios
tables = [
raw_data,
cleaned_data,
final_data,
training_data_correct,
]
is_valid_dag = validate_dag(tables)
assert is_valid_dag, "The DAG should be valid."
# Deliberately adding c4 which is not in final_data
training_data_incorrect = Table(name="training_data_incorrect", columns={c1, c2, c4}, parent=final_data)
tables = [
raw_data,
cleaned_data,
final_data,
training_data_incorrect,
]
is_valid_dag = validate_dag(tables)
assert not is_valid_dag, "The DAG should be invalid."
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment