View basic-prefect-etl-flow.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from prefect import Flow, task | |
import pandas as pd | |
def score_check(grade, subject, student): | |
""" | |
This is a normal "business logic" function which is not a Prefect task. | |
If a student achieved a score > 90, multiply it by 2 for their effort! But only if the subject is not NULL. | |
:param grade: number of points on an exam | |
:param subject: school subject |
View basic-prefect-etl-flow.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from prefect.environments.storage import Docker | |
from prefect import Flow, task | |
import pandas as pd | |
def score_check(grade, subject, student): | |
""" | |
This is a normal "business logic" function which is not a Prefect task. | |
If a student achieved a score > 90, multiply it by 2 for their effort! But only if the subject is not NULL. | |
:param grade: number of points on an exam |
View dask-k8.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from prefect.environments.storage import Docker | |
from prefect.environments import DaskKubernetesEnvironment | |
from prefect import task, Flow | |
import random | |
from time import sleep | |
@task | |
def inc(x): | |
sleep(random.random() / 10) |
View trigger_and_wait_for_completion.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from airflow.models import DagModel, TaskInstance, DagRun | |
from airflow.utils.decorators import apply_defaults | |
from airflow.operators.dagrun_operator import TriggerDagRunOperator | |
from airflow.sensors.base_sensor_operator import BaseSensorOperator | |
from airflow.utils.db import provide_session | |
class WaitForCompletion(BaseSensorOperator): | |
""" | |
Waits for a different DAG or a task in a different DAG to complete |
View staging_area_example_USAGE.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from airflow import DAG | |
from lib.operators.trigger_and_wait_for_completion import trigger_dag | |
with DAG(dag_id='business_logic', | |
default_args=None, # None only for demonstration purposes - this shouldn't be None | |
schedule_interval=None) as dag: | |
staging_area = trigger_dag(task_id='staging_area', | |
trigger_dag_id='staging_area', | |
wait_for_task='finish') | |
bus_logic_child_dag, wait_for_bus_logic = trigger_dag(task_id='business_logic_layer', |
View master_flow.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" Trigger child Flows from this parent Flow """ | |
from prefect.tasks.prefect.flow_run import FlowRunTask | |
from prefect import Flow | |
with Flow("FlowRunTask_Example") as flow: | |
staging_area = FlowRunTask(flow_name='staging_area', | |
project_name="InterFlow_Dependencies", | |
wait=True) | |
business_logic_layer = FlowRunTask(flow_name='business_logic_layer', |
View staging_area.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from prefect import task, Flow | |
@task(log_stdout=True) | |
def stage_1(): | |
print("Executing stage_1") | |
@task(log_stdout=True) | |
def stage_2(): |
View business_logic_layer.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from prefect import task, Flow | |
@task(log_stdout=True) | |
def bus_logic_etl_1(): | |
print("Executing bus_logic_etl_1") | |
@task(log_stdout=True) | |
def bus_logic_etl_2(): |
View data_mart.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from prefect import task, Flow | |
@task(log_stdout=True) | |
def data_mart_1(): | |
print("Executing data_mart_1") | |
@task(log_stdout=True) | |
def data_mart_2(): |
View master_flow.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" Trigger child Flows from this parent Flow """ | |
from prefect.tasks.prefect.flow_run import FlowRunTask | |
from prefect import Flow | |
with Flow("MasterFlow") as flow: | |
staging_area = FlowRunTask(flow_name='staging_area', | |
project_name="InterFlow_Dependencies", | |
wait=True) | |
business_logic_layer = FlowRunTask(flow_name='business_logic_layer', |
OlderNewer