Skip to content

Instantly share code, notes, and snippets.

Avatar

Anna Geller anna-anisienia

View GitHub Profile
@anna-anisienia
anna-anisienia / basic-prefect-etl-flow.py
Created Aug 24, 2020
Basic ETL pipeline with Prefect
View basic-prefect-etl-flow.py
from prefect import Flow, task
import pandas as pd
def score_check(grade, subject, student):
"""
This is a normal "business logic" function which is not a Prefect task.
If a student achieved a score > 90, multiply it by 2 for their effort! But only if the subject is not NULL.
:param grade: number of points on an exam
:param subject: school subject
@anna-anisienia
anna-anisienia / basic-prefect-etl-flow.py
Created Aug 24, 2020
Basic ETL with Prefect using Docker storage
View basic-prefect-etl-flow.py
from prefect.environments.storage import Docker
from prefect import Flow, task
import pandas as pd
def score_check(grade, subject, student):
"""
This is a normal "business logic" function which is not a Prefect task.
If a student achieved a score > 90, multiply it by 2 for their effort! But only if the subject is not NULL.
:param grade: number of points on an exam
View staging_area_example_USAGE.py
from airflow import DAG
from lib.operators.trigger_and_wait_for_completion import trigger_dag
with DAG(dag_id='business_logic',
default_args=None, # None only for demonstration purposes - this shouldn't be None
schedule_interval=None) as dag:
staging_area = trigger_dag(task_id='staging_area',
trigger_dag_id='staging_area',
wait_for_task='finish')
bus_logic_child_dag, wait_for_bus_logic = trigger_dag(task_id='business_logic_layer',
View master_flow.py
""" Trigger child Flows from this parent Flow """
from prefect.tasks.prefect.flow_run import FlowRunTask
from prefect import Flow
with Flow("FlowRunTask_Example") as flow:
staging_area = FlowRunTask(flow_name='staging_area',
project_name="InterFlow_Dependencies",
wait=True)
business_logic_layer = FlowRunTask(flow_name='business_logic_layer',
View staging_area.py
from prefect import task, Flow
@task(log_stdout=True)
def stage_1():
print("Executing stage_1")
@task(log_stdout=True)
def stage_2():
View business_logic_layer.py
from prefect import task, Flow
@task(log_stdout=True)
def bus_logic_etl_1():
print("Executing bus_logic_etl_1")
@task(log_stdout=True)
def bus_logic_etl_2():
View data_mart.py
from prefect import task, Flow
@task(log_stdout=True)
def data_mart_1():
print("Executing data_mart_1")
@task(log_stdout=True)
def data_mart_2():
View master_flow.py
""" Trigger child Flows from this parent Flow """
from prefect.tasks.prefect.flow_run import FlowRunTask
from prefect import Flow
with Flow("MasterFlow") as flow:
staging_area = FlowRunTask(flow_name='staging_area',
project_name="InterFlow_Dependencies",
wait=True)
business_logic_layer = FlowRunTask(flow_name='business_logic_layer',
View master_flow_with_schedule.py
from prefect.tasks.prefect.flow_run import FlowRunTask
from prefect import Flow
from prefect.schedules import Schedule
from prefect.schedules.clocks import CronClock
schedule_parent_flow = Schedule(clocks=[CronClock("0 2 * * *")])
with Flow("MasterFlow", schedule=schedule_parent_flow) as flow:
staging_area = FlowRunTask(flow_name='staging_area',
View tz_specific_schedule.py
from prefect.schedules import Schedule
from prefect.schedules.clocks import CronClock
import pendulum
schedule = Schedule(clocks=[CronClock("0 2 * * *",
start_date=pendulum.datetime(2020, 8, 28,
tz="Europe/Berlin")
)])
schedule.next(3) # to see next 3 schedules