Last active
November 11, 2024 16:32
-
-
Save jacopotagliabue/30d30566d6a9245aabbb28fe5d7d26bb to your computer and use it in GitHub Desktop.
Graph inference for FaaS planning (sample, generated Python code, before Kuzu)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import List, Optional, Set | |
class Column: | |
def __init__(self, name: str): | |
self.name = name | |
def __repr__(self): | |
return f"Column(name={self.name})" | |
class Table: | |
def __init__(self, name: str, columns: Set[Column], parent: Optional['Table'] = None, filter_condition: Optional[str] = None): | |
self.name = name | |
self.columns = columns # Set of Column objects | |
self.parent = parent | |
self.filter_condition = filter_condition | |
self.children = [] | |
# Link this table as a child to its parent | |
if parent: | |
parent.children.append(self) | |
def __repr__(self): | |
return f"Table(name={self.name}, columns={[col.name for col in self.columns]})" | |
def get_column_names(self) -> Set[str]: | |
return {col.name for col in self.columns} | |
# Create column instances | |
c1 = Column("c1") | |
c2 = Column("c2") | |
c3 = Column("c3") | |
c4 = Column("c4") | |
c5 = Column("c5") | |
eventTime = Column("eventTime") | |
# Define tables and link columns and parent-child relationships | |
raw_data = Table(name="raw_data", columns={c1, c2, c3, c4, eventTime}) | |
cleaned_data = Table(name="cleaned_data", columns={c1, c2, c3}, parent=raw_data, filter_condition="eventTime BETWEEN '2023-01-01' AND '2023-02-01'") | |
final_data = Table(name="final_data", columns={c1, c2, c3}, parent=cleaned_data) | |
training_data_correct = Table(name="training_data", columns={c1, c2}, parent=final_data) | |
def validate_dag(tables: List[Table]) -> bool: | |
"""Validate the DAG structure to check for a single root and column consistency.""" | |
# Identify the root node (node without a parent) | |
root_tables = [table for table in tables if table.parent is None] | |
if len(root_tables) != 1: | |
print(f"Error: Expected exactly one root table, found {len(root_tables)}.") | |
return False | |
# Traverse each table from root and validate columns at each step | |
def validate_columns(table: Table) -> bool: | |
# If the table has a parent, ensure all columns are a subset of the parent's columns | |
if table.parent: | |
parent_column_names = table.parent.get_column_names() | |
table_column_names = table.get_column_names() | |
if not table_column_names.issubset(parent_column_names): | |
missing_columns = table_column_names - parent_column_names | |
print(f"Error: Table '{table.name}' has columns {missing_columns} not found in its immediate parent '{table.parent.name}'.") | |
return False | |
# Recursively validate for each child | |
for child in table.children: | |
if not validate_columns(child): | |
return False | |
return True | |
# Start validation from the root | |
root = root_tables[0] | |
is_valid = validate_columns(root) | |
if is_valid: | |
print("DAG validation successful: Single root and column consistency maintained.") | |
return is_valid | |
# test out some scenarios | |
tables = [ | |
raw_data, | |
cleaned_data, | |
final_data, | |
training_data_correct, | |
] | |
is_valid_dag = validate_dag(tables) | |
assert is_valid_dag, "The DAG should be valid." | |
# Deliberately adding c4 which is not in final_data | |
training_data_incorrect = Table(name="training_data_incorrect", columns={c1, c2, c4}, parent=final_data) | |
tables = [ | |
raw_data, | |
cleaned_data, | |
final_data, | |
training_data_incorrect, | |
] | |
is_valid_dag = validate_dag(tables) | |
assert not is_valid_dag, "The DAG should be invalid." |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment