This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from prefect import Flow, task | |
import pandas as pd | |
def score_check(grade, subject, student): | |
""" | |
This is a normal "business logic" function which is not a Prefect task. | |
If a student achieved a score > 90, multiply it by 2 for their effort! But only if the subject is not NULL. | |
:param grade: number of points on an exam | |
:param subject: school subject |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from prefect.environments.storage import Docker | |
from prefect import Flow, task | |
import pandas as pd | |
def score_check(grade, subject, student): | |
""" | |
This is a normal "business logic" function which is not a Prefect task. | |
If a student achieved a score > 90, multiply it by 2 for their effort! But only if the subject is not NULL. | |
:param grade: number of points on an exam |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from prefect.environments.storage import Docker | |
from prefect.environments import DaskKubernetesEnvironment | |
from prefect import task, Flow | |
import random | |
from time import sleep | |
@task | |
def inc(x): | |
sleep(random.random() / 10) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from airflow.models import DagModel, TaskInstance, DagRun | |
from airflow.utils.decorators import apply_defaults | |
from airflow.operators.dagrun_operator import TriggerDagRunOperator | |
from airflow.sensors.base_sensor_operator import BaseSensorOperator | |
from airflow.utils.db import provide_session | |
class WaitForCompletion(BaseSensorOperator): | |
""" | |
Waits for a different DAG or a task in a different DAG to complete |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from airflow import DAG | |
from lib.operators.trigger_and_wait_for_completion import trigger_dag | |
with DAG(dag_id='business_logic', | |
default_args=None, # None only for demonstration purposes - this shouldn't be None | |
schedule_interval=None) as dag: | |
staging_area = trigger_dag(task_id='staging_area', | |
trigger_dag_id='staging_area', | |
wait_for_task='finish') | |
bus_logic_child_dag, wait_for_bus_logic = trigger_dag(task_id='business_logic_layer', |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" Trigger child Flows from this parent Flow """ | |
from prefect.tasks.prefect.flow_run import FlowRunTask | |
from prefect import Flow | |
with Flow("FlowRunTask_Example") as flow: | |
staging_area = FlowRunTask(flow_name='staging_area', | |
project_name="InterFlow_Dependencies", | |
wait=True) | |
business_logic_layer = FlowRunTask(flow_name='business_logic_layer', |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from prefect import task, Flow | |
@task(log_stdout=True) | |
def stage_1(): | |
print("Executing stage_1") | |
@task(log_stdout=True) | |
def stage_2(): |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from prefect import task, Flow | |
@task(log_stdout=True) | |
def bus_logic_etl_1(): | |
print("Executing bus_logic_etl_1") | |
@task(log_stdout=True) | |
def bus_logic_etl_2(): |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from prefect import task, Flow | |
@task(log_stdout=True) | |
def data_mart_1(): | |
print("Executing data_mart_1") | |
@task(log_stdout=True) | |
def data_mart_2(): |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" Trigger child Flows from this parent Flow """ | |
from prefect.tasks.prefect.flow_run import FlowRunTask | |
from prefect import Flow | |
with Flow("MasterFlow") as flow: | |
staging_area = FlowRunTask(flow_name='staging_area', | |
project_name="InterFlow_Dependencies", | |
wait=True) | |
business_logic_layer = FlowRunTask(flow_name='business_logic_layer', |
OlderNewer