Skip to content

Instantly share code, notes, and snippets.

@Cdaprod
Last active March 6, 2024 03:35
Show Gist options
  • Save Cdaprod/517382d80115887ba25d0c899806b929 to your computer and use it in GitHub Desktop.
Save Cdaprod/517382d80115887ba25d0c899806b929 to your computer and use it in GitHub Desktop.
This approach allows you to build flexible and dynamic ETL pipelines where the flow can adapt based on the data it processes. Each node in the pipeline can perform specific ETL tasks, and the decision on where the data flows next can be made dynamically, enabling complex data processing scenarios tailored to the needs of each dataset.

To create dynamic ETL (Extract, Transform, Load) pipes with dynamic edges and nodes using Pydantic for data validation, we will expand upon the previously discussed abstracted LangGraph-like state machine. This ETL pipeline will dynamically adjust its flow based on the data passed through each node, allowing for flexible data processing scenarios.

Step 1: Define the Data Model and Node Functionality

First, define Pydantic models for the data that will flow through the ETL pipeline. These models ensure that each stage of the ETL process receives and produces data in the expected format.

from pydantic import BaseModel

class DataModel(BaseModel):
    # Define your data model here
    data: dict

Step 2: Enhance Node and Graph Definitions

Enhance the Node class to include dynamic behavior based on the input data, and adjust the StateGraph class to handle dynamic edges and nodes based on the execution context.

from typing import Callable, Dict, Optional

class Node(BaseModel):
    id: str
    process: Callable[[DataModel], DataModel]
    # Node can dynamically determine its next step
    next_step: Optional[Callable[[DataModel], str]] = None

class DynamicETLGraph(BaseModel):
    nodes: Dict[str, Node] = {}
    
    def add_node(self, node: Node):
        self.nodes[node.id] = node
    
    def execute(self, start_node_id: str, initial_data: DataModel):
        current_node_id = start_node_id
        current_data = initial_data
        
        while current_node_id:
            current_node = self.nodes.get(current_node_id)
            if not current_node:
                raise ValueError(f"Node {current_node_id} not found")
            
            # Process current node
            current_data = current_node.process(current_data)
            
            # Determine next step dynamically
            if current_node.next_step:
                current_node_id = current_node.next_step(current_data)
            else:
                break  # No next step defined, end of the pipeline

# Example of dynamic node function
def extract_function(input_data: DataModel) -> DataModel:
    # Implement the extraction logic
    return input_data

def transform_function(input_data: DataModel) -> DataModel:
    # Implement the transformation logic
    return input_data

# Example of dynamic next step determination
def determine_next_step(input_data: DataModel) -> str:
    # Logic to determine the next node based on input_data
    return "transform"  # or any other node id based on logic

Step 3: Implement ETL Pipeline Execution

With the nodes defined and added to your graph, you can now execute the ETL pipeline by specifying the starting node and the initial data. The pipeline will dynamically adjust its path based on the data processed at each stage.

# Initialize your ETL pipeline graph
etl_graph = DynamicETLGraph()

# Add nodes to your graph
etl_graph.add_node(Node(id="extract", process=extract_function, next_step=determine_next_step))
etl_graph.add_node(Node(id="transform", process=transform_function))

# Execute the pipeline starting from the 'extract' node
initial_data = DataModel(data={"sample": "data"})
etl_graph.execute(start_node_id="extract", initial_data=initial_data)

This approach allows you to build flexible and dynamic ETL pipelines where the flow can adapt based on the data it processes. Each node in the pipeline can perform specific ETL tasks, and the decision on where the data flows next can be made dynamically, enabling complex data processing scenarios tailored to the needs of each dataset.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment