This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from prefect import Flow, task | |
import pandas as pd | |
def score_check(grade, subject, student): | |
""" | |
This is a normal "business logic" function which is not a Prefect task. | |
If a student achieved a score > 90, multiply it by 2 for their effort! But only if the subject is not NULL. | |
:param grade: number of points on an exam | |
:param subject: school subject |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from prefect.environments.storage import Docker | |
from prefect import Flow, task | |
import pandas as pd | |
def score_check(grade, subject, student): | |
""" | |
This is a normal "business logic" function which is not a Prefect task. | |
If a student achieved a score > 90, multiply it by 2 for their effort! But only if the subject is not NULL. | |
:param grade: number of points on an exam |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from airflow import DAG | |
from lib.operators.trigger_and_wait_for_completion import trigger_dag | |
with DAG(dag_id='business_logic', | |
default_args=None, # None only for demonstration purposes - this shouldn't be None | |
schedule_interval=None) as dag: | |
staging_area = trigger_dag(task_id='staging_area', | |
trigger_dag_id='staging_area', | |
wait_for_task='finish') | |
bus_logic_child_dag, wait_for_bus_logic = trigger_dag(task_id='business_logic_layer', |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" Trigger child Flows from this parent Flow """ | |
from prefect.tasks.prefect.flow_run import FlowRunTask | |
from prefect import Flow | |
with Flow("FlowRunTask_Example") as flow: | |
staging_area = FlowRunTask(flow_name='staging_area', | |
project_name="InterFlow_Dependencies", | |
wait=True) | |
business_logic_layer = FlowRunTask(flow_name='business_logic_layer', |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from prefect import task, Flow | |
@task(log_stdout=True) | |
def stage_1(): | |
print("Executing stage_1") | |
@task(log_stdout=True) | |
def stage_2(): |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from prefect import task, Flow | |
@task(log_stdout=True) | |
def bus_logic_etl_1(): | |
print("Executing bus_logic_etl_1") | |
@task(log_stdout=True) | |
def bus_logic_etl_2(): |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from prefect import task, Flow | |
@task(log_stdout=True) | |
def data_mart_1(): | |
print("Executing data_mart_1") | |
@task(log_stdout=True) | |
def data_mart_2(): |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" Trigger child Flows from this parent Flow """ | |
from prefect.tasks.prefect.flow_run import FlowRunTask | |
from prefect import Flow | |
with Flow("MasterFlow") as flow: | |
staging_area = FlowRunTask(flow_name='staging_area', | |
project_name="InterFlow_Dependencies", | |
wait=True) | |
business_logic_layer = FlowRunTask(flow_name='business_logic_layer', |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from prefect.tasks.prefect.flow_run import FlowRunTask | |
from prefect import Flow | |
from prefect.schedules import Schedule | |
from prefect.schedules.clocks import CronClock | |
schedule_parent_flow = Schedule(clocks=[CronClock("0 2 * * *")]) | |
with Flow("MasterFlow", schedule=schedule_parent_flow) as flow: | |
staging_area = FlowRunTask(flow_name='staging_area', |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from prefect.schedules import Schedule | |
from prefect.schedules.clocks import CronClock | |
import pendulum | |
schedule = Schedule(clocks=[CronClock("0 2 * * *", | |
start_date=pendulum.datetime(2020, 8, 28, | |
tz="Europe/Berlin") | |
)]) | |
schedule.next(3) # to see next 3 schedules |
OlderNewer